[09/41] couch-mrview commit: updated refs/heads/master to 28e51f3

couch_mrview: couch_mrview_changes:handle_changes

Similar to couch_changes:handle_changes but for view changes. It add
support for longpolling, normal and continuous stream The API differs
from the one for doc by beeing independant from the transport: the
support of HTTP will be added on top for example.

This API will be also used to replace the view filter in the current _changes

Also add unittests.


Branch: refs/heads/master
Commit: 1c24c425f2ec9fa63b0e01a13673af234043ee30
Parents: 18b5f6f
Author: benoitc <>
Authored: Fri Jan 31 13:13:23 2014 +0100
Committer: Benjamin Bastian <>
Committed: Thu Oct 30 13:38:33 2014 -0700

 src/couch_mrview_changes.erl   | 173 ++++++++++++++++++++++++++++++++
 src/couch_mrview_test_util.erl |   2 +
 test/09-index-events.t         |  17 +++-
 test/10-index-changes.t        | 194 ++++++++++++++++++++++++++++++++++++
 4 files changed, 385 insertions(+), 1 deletion(-)
diff --git a/src/couch_mrview_changes.erl b/src/couch_mrview_changes.erl
new file mode 100644
index 0000000..2b8f910
--- /dev/null
+++ b/src/couch_mrview_changes.erl
@@ -0,0 +1,173 @@
+-record(vst, {dbname,
+              ddoc,
+              view,
+              view_options,
+              since,
+              callback,
+              acc,
+              user_timeout,
+              timeout,
+              heartbeat,
+              timeout_acc=0,
+              notifier,
+              stream}).
+-type changes_stream() :: true | false | once.
+-type changes_options() :: [{stream, changes_stream()} |
+                            {since, integer()} |
+                            {view_options, list()} |
+                            {timeout, integer()} |
+                            {heartbeat, true | integer()}].
+%% @doc function returning changes in a streaming fashion if needed.
+-spec handle_changes(binary(), binary(), binary(), function(), term(),
+                     changes_options()) -> ok | {error, term()}.
+handle_changes(DbName, DDocId, View, Fun, Acc, Options) ->
+    Since = proplists:get_value(since, Options, 0),
+    Stream = proplists:get_value(stream, Options, false),
+    ViewOptions = proplists:get_value(view_options, Options, []),
+    State0 = #vst{dbname=DbName,
+                  ddoc=DDocId,
+                  view=View,
+                  view_options=ViewOptions,
+                  since=Since,
+                  callback=Fun,
+                  acc=Acc},
+    case view_changes_since(State0) of
+        {ok, #vst{since=LastSeq, acc=Acc2}=State} ->
+            case Stream of
+                true ->
+                    start_loop(State#vst{stream=true}, Options);
+                once when LastSeq =:= Since ->
+                    start_loop(State#vst{stream=once}, Options);
+                _ ->
+                    Fun(stop, {LastSeq, Acc2})
+            end;
+        {stop, #vst{since=LastSeq, acc=Acc2}} ->
+            Fun(stop, {LastSeq, Acc2});
+        Error ->
+            Error
+    end.
+start_loop(#vst{dbname=DbName, ddoc=DDocId}=State, Options) ->
+    {UserTimeout, Timeout, Heartbeat} = changes_timeout(Options),
+    Notifier = index_update_notifier(DbName, DDocId),
+    try
+        loop(State#vst{notifier=Notifier,
+                       user_timeout=UserTimeout,
+                       timeout=Timeout,
+                       heartbeat=Heartbeat})
+    after
+        couch_index_event:stop(Notifier)
+    end.
+loop(#vst{since=Since, callback=Callback, acc=Acc,
+          user_timeout=UserTimeout, timeout=Timeout,
+          heartbeat=Heartbeat, timeout_acc=TimeoutAcc,
+          stream=Stream}=State) ->
+    receive
+        index_update ->
+            case view_changes_since(State) of
+                {ok, State2} when Stream =:= true ->
+                    loop(State2#vst{timeout_acc=0});
+                {ok, #vst{since=LastSeq, acc=Acc2}} ->
+                    Callback(stop, {LastSeq, Acc2});
+                {stop, #vst{since=LastSeq, acc=Acc2}} ->
+                    Callback(stop, {LastSeq, Acc2})
+            end;
+        index_delete ->
+            Callback(stop, {Since, Acc})
+    after Timeout ->
+            TimeoutAcc2 = TimeoutAcc + Timeout,
+            case UserTimeout =< TimeoutAcc2 of
+                true ->
+                    Callback(stop, {Since, Acc});
+                false when Heartbeat =:= true ->
+                    case Callback(heartbeat, Acc) of
+                        {ok, Acc2} ->
+                            loop(State#vst{acc=Acc2, timeout_acc=TimeoutAcc2});
+                        {stop, Acc2} ->
+                            Callback(stop, {Since, Acc2})
+                    end;
+                _ ->
+                    Callback(stop, {Since, Acc})
+            end
+    end.
+changes_timeout(Options) ->
+    DefaultTimeout = list_to_integer(
+            couch_config:get("httpd", "changes_timeout", "60000")
+    ),
+    UserTimeout = proplists:get_value(timeout, Options, DefaultTimeout),
+    {Timeout, Heartbeat} = case proplists:get_value(heartbeat, Options) of
+        undefined -> {UserTimeout, false};
+        true ->
+            T = erlang:min(DefaultTimeout, UserTimeout),
+            {T, true};
+        H ->
+            T = erlang:min(H, UserTimeout),
+            {T, true}
+    end,
+    {UserTimeout, Timeout, Heartbeat}.
+view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View,
+                        view_options=Options, since=Since,
+                        callback=Callback, acc=UserAcc}=State) ->
+    Wrapper = fun ({{Seq, _Key, _DocId}, _Val}=KV, {Go, Acc2, OldSeq}) ->
+            LastSeq = if OldSeq < Seq -> Seq;
+                true -> OldSeq
+            end,
+            case Callback(KV, Acc2) of
+                {ok, Acc3} -> {ok, {Go, Acc3, LastSeq}};
+                {stop, Acc3} -> {stop, {stop, Acc3, LastSeq}}
+            end
+    end,
+    Acc0 = {ok, UserAcc, Since},
+    case couch_mrview:view_changes_since(DbName, DDocId, View, Since,
+                                         Wrapper, Options, Acc0) of
+        {ok, {Go, UserAcc2, Since2}}->
+            {Go, State#vst{since=Since2, acc=UserAcc2}};
+        Error ->
+            Error
+    end.
+index_update_notifier(#db{name=DbName}, DDocId) ->
+    index_update_notifier(DbName, DDocId);
+index_update_notifier(DbName, DDocId) ->
+    Self = self(),
+    {ok, NotifierPid} = couch_index_event:start_link(fun
+                ({index_update, {Name, Id, couch_mrview_index}})
+                        when Name =:= DbName, Id =:= DDocId ->
+                    Self ! index_update;
+                ({index_delete, {Name, Id, couch_mrview_index}})
+                        when Name =:= DbName, Id =:= DDocId ->
+                    Self ! index_delete;
+                (_) ->
+                    ok
+            end),
+    NotifierPid.
diff --git a/src/couch_mrview_test_util.erl b/src/couch_mrview_test_util.erl
index c68010c..1d3d788 100644
--- a/src/couch_mrview_test_util.erl
+++ b/src/couch_mrview_test_util.erl
@@ -33,6 +33,8 @@ new_db(Name, Type) ->
     {ok, Db} = couch_db:create(Name, [?ADMIN_USER]),
     save_docs(Db, [ddoc(Type)]).
+delete_db(Name) ->
+    couch_server:delete(Name, [{user_ctx, ?ADMIN}]).
 save_docs(Db, Docs) ->
     {ok, _} = couch_db:update_docs(Db, Docs, []),
diff --git a/test/09-index-events.t b/test/09-index-events.t
index 90654b8..1489e4e 100644
--- a/test/09-index-events.t
+++ b/test/09-index-events.t
@@ -15,7 +15,7 @@
 % the License.
 main(_) ->
-    etap:plan(2),
+    etap:plan(4),
     case (catch test()) of
         ok ->
@@ -30,6 +30,7 @@ test() ->
     {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes),
+    test_delete_event(Db),
@@ -44,3 +45,17 @@ test_update_event(Db) ->
             etap:is(Event, Expect, "index update events OK")
+test_delete_event(Db) ->
+     ok = couch_mrview:refresh(Db, <<"_design/bar">>),
+    {ok, Pid} = couch_index_event:start_link(self()),
+    etap:ok(is_pid(Pid), "event handler added"),
+    couch_mrview_test_util:delete_db(<<"foo">>),
+    Expect = {index_delete, {<<"foo">>, <<"_design/bar">>,
+                             couch_mrview_index}},
+    receive
+        Event ->
+            etap:is(Event, Expect, "index delete events OK")
+    end,
+    couch_index_event:stop(Pid).
diff --git a/test/10-index-changes.t b/test/10-index-changes.t
new file mode 100644
index 0000000..627376f
--- /dev/null
+++ b/test/10-index-changes.t
@@ -0,0 +1,194 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap
+main(_) ->
+    etap:plan(6),
+    case (catch test()) of
+        ok ->
+            etap:end_tests();
+        Other ->
+            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+            etap:bail(Other)
+    end,
+    timer:sleep(300),
+    ok.
+test() ->
+    test_util:start_couch(),
+    {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes),
+    test_normal_changes(Db),
+    test_stream_once(Db),
+    test_stream_once_since(Db),
+    test_stream_once_timeout(Db),
+    test_stream_once_heartbeat(Db),
+    test_stream(Db),
+    test_util:stop_couch(),
+    ok.
+test_normal_changes(Db) ->
+    Result = run_query(Db, []),
+    Expect = {ok, 11, [
+                {{2, 1, <<"1">>}, 1},
+                {{3, 10, <<"10">>}, 10},
+                {{4, 2, <<"2">>}, 2},
+                {{5, 3, <<"3">>}, 3},
+                {{6, 4, <<"4">>}, 4},
+                {{7, 5, <<"5">>}, 5},
+                {{8, 6, <<"6">>}, 6},
+                {{9, 7, <<"7">>}, 7},
+                {{10, 8, <<"8">>}, 8},
+                {{11, 9, <<"9">>}, 9}
+    ]},
+    etap:is(Result, Expect, "normal changes worked.").
+test_stream_once(Db) ->
+    Result = run_query(Db, [{stream, once}]),
+    Expect = {ok, 11, [
+                {{2, 1, <<"1">>}, 1},
+                {{3, 10, <<"10">>}, 10},
+                {{4, 2, <<"2">>}, 2},
+                {{5, 3, <<"3">>}, 3},
+                {{6, 4, <<"4">>}, 4},
+                {{7, 5, <<"5">>}, 5},
+                {{8, 6, <<"6">>}, 6},
+                {{9, 7, <<"7">>}, 7},
+                {{10, 8, <<"8">>}, 8},
+                {{11, 9, <<"9">>}, 9}
+    ]},
+    etap:is(Result, Expect, "stream once since 0 worked.").
+test_stream_once_since(Db) ->
+    Self = self(),
+    spawn(fun() ->
+                Result = run_query(Db, [{since, 11},
+                                        {stream, once}]),
+                Self ! {result, Result}
+        end),
+    spawn(fun() ->
+                timer:sleep(1000),
+                {ok, Db1} = save_doc(Db, 11),
+                couch_mrview:refresh(Db1, <<"_design/bar">>)
+        end),
+    Expect = {ok,12,[{{12,11,<<"11">>},11}]},
+    receive
+        {result, Result} ->
+            etap:is(Result, Expect, "normal changes worked.")
+    after 5000 ->
+            io:format("never got the change", [])
+    end.
+test_stream_once_timeout(Db) ->
+    Self = self(),
+    spawn(fun() ->
+                Result = run_query(Db, [{since, 12},
+                                        {stream, once},
+                                        {timeout, 3000}]),
+                Self ! {result, Result}
+        end),
+    Expect = {ok, 12, []},
+    receive
+        {result, Result} ->
+            etap:is(Result, Expect, "got timeout.")
+    after 5000 ->
+            io:format("never got the change", [])
+    end.
+test_stream_once_heartbeat(Db) ->
+    Self = self(),
+    spawn(fun() ->
+                Result = run_query(Db, [{since, 12},
+                                        {stream, once},
+                                        {heartbeat, 1000}]),
+                Self ! {result, Result}
+        end),
+    spawn(fun() ->
+                timer:sleep(3000),
+                {ok, Db1} = save_doc(Db, 12),
+                couch_mrview:refresh(Db1, <<"_design/bar">>)
+        end),
+    Expect = {ok,13,[heartbeat,
+                     heartbeat,
+                     heartbeat,
+                     {{13,12,<<"12">>},12}]},
+    receive
+        {result, Result} ->
+            etap:is(Result, Expect, "heartbeat OK.")
+    after 5000 ->
+            io:format("never got the change", [])
+    end.
+test_stream(Db) ->
+    Self = self(),
+    spawn(fun() ->
+                Result = run_query(Db, [{since, 13},
+                                        stream,
+                                        {timeout, 3000}]),
+                Self ! {result, Result}
+        end),
+    spawn(fun() ->
+                timer:sleep(1000),
+                {ok, Db1} = save_doc(Db, 13),
+                couch_mrview:refresh(Db1, <<"_design/bar">>),
+                {ok, Db2} = save_doc(Db1, 14),
+                couch_mrview:refresh(Db2, <<"_design/bar">>)
+        end),
+    Expect = {ok, 15,[{{14,13,<<"13">>},13},
+                     {{15,14,<<"14">>},14}]},
+    receive
+        {result, Result} ->
+            etap:is(Result, Expect, "stream OK.")
+    after 5000 ->
+            io:format("never got the change", [])
+    end.
+save_doc(Db, Id) ->
+    Doc = couch_mrview_test_util:doc(Id),
+    {ok, _Rev} = couch_db:update_doc(Db, Doc, []),
+    {ok, _} =  couch_db:ensure_full_commit(Db),
+    couch_db:reopen(Db).
+run_query(Db, Opts) ->
+    Fun = fun
+        (stop, {LastSeq, Acc}) ->
+            {ok, LastSeq, Acc};
+        (heartbeat, Acc) ->
+            {ok, [heartbeat | Acc]};
+        (Event, Acc) ->
+            {ok, [Event | Acc]}
+    end,
+    couch_mrview:refresh(Db, <<"_design/bar">>),
+    {ok, LastSeq, R} = couch_mrview_changes:handle_changes(Db, <<"_design/bar">>,
+                                                  <<"baz">>, Fun, [], Opts),
+    {ok, LastSeq, lists:reverse(R)}.