You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2014/10/31 20:53:24 UTC
[09/41] couch-mrview commit: updated refs/heads/master to 28e51f3
couch_mrview: couch_mrview_changes:handle_changes
Similar to couch_changes:handle_changes but for view changes. It add
support for longpolling, normal and continuous stream The API differs
from the one for doc by beeing independant from the transport: the
support of HTTP will be added on top for example.
This API will be also used to replace the view filter in the current _changes
API.
Also add unittests.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/commit/1c24c425
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/tree/1c24c425
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/diff/1c24c425
Branch: refs/heads/master
Commit: 1c24c425f2ec9fa63b0e01a13673af234043ee30
Parents: 18b5f6f
Author: benoitc <be...@apache.org>
Authored: Fri Jan 31 13:13:23 2014 +0100
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 13:38:33 2014 -0700
----------------------------------------------------------------------
src/couch_mrview_changes.erl | 173 ++++++++++++++++++++++++++++++++
src/couch_mrview_test_util.erl | 2 +
test/09-index-events.t | 17 +++-
test/10-index-changes.t | 194 ++++++++++++++++++++++++++++++++++++
4 files changed, 385 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/src/couch_mrview_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_changes.erl b/src/couch_mrview_changes.erl
new file mode 100644
index 0000000..2b8f910
--- /dev/null
+++ b/src/couch_mrview_changes.erl
@@ -0,0 +1,173 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+%
+-module(couch_mrview_changes).
+
+-export([handle_changes/6]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-record(vst, {dbname,
+ ddoc,
+ view,
+ view_options,
+ since,
+ callback,
+ acc,
+ user_timeout,
+ timeout,
+ heartbeat,
+ timeout_acc=0,
+ notifier,
+ stream}).
+
+-type changes_stream() :: true | false | once.
+-type changes_options() :: [{stream, changes_stream()} |
+ {since, integer()} |
+ {view_options, list()} |
+ {timeout, integer()} |
+ {heartbeat, true | integer()}].
+
+-export_type([changes_stream/0]).
+-export_type([changes_options/0]).
+
+%% @doc function returning changes in a streaming fashion if needed.
+-spec handle_changes(binary(), binary(), binary(), function(), term(),
+ changes_options()) -> ok | {error, term()}.
+handle_changes(DbName, DDocId, View, Fun, Acc, Options) ->
+ Since = proplists:get_value(since, Options, 0),
+ Stream = proplists:get_value(stream, Options, false),
+ ViewOptions = proplists:get_value(view_options, Options, []),
+
+ State0 = #vst{dbname=DbName,
+ ddoc=DDocId,
+ view=View,
+ view_options=ViewOptions,
+ since=Since,
+ callback=Fun,
+ acc=Acc},
+
+ case view_changes_since(State0) of
+ {ok, #vst{since=LastSeq, acc=Acc2}=State} ->
+ case Stream of
+ true ->
+ start_loop(State#vst{stream=true}, Options);
+ once when LastSeq =:= Since ->
+ start_loop(State#vst{stream=once}, Options);
+ _ ->
+ Fun(stop, {LastSeq, Acc2})
+ end;
+ {stop, #vst{since=LastSeq, acc=Acc2}} ->
+ Fun(stop, {LastSeq, Acc2});
+ Error ->
+ Error
+ end.
+
+start_loop(#vst{dbname=DbName, ddoc=DDocId}=State, Options) ->
+ {UserTimeout, Timeout, Heartbeat} = changes_timeout(Options),
+ Notifier = index_update_notifier(DbName, DDocId),
+ try
+ loop(State#vst{notifier=Notifier,
+ user_timeout=UserTimeout,
+ timeout=Timeout,
+ heartbeat=Heartbeat})
+ after
+ couch_index_event:stop(Notifier)
+ end.
+
+loop(#vst{since=Since, callback=Callback, acc=Acc,
+ user_timeout=UserTimeout, timeout=Timeout,
+ heartbeat=Heartbeat, timeout_acc=TimeoutAcc,
+ stream=Stream}=State) ->
+ receive
+ index_update ->
+ case view_changes_since(State) of
+ {ok, State2} when Stream =:= true ->
+ loop(State2#vst{timeout_acc=0});
+ {ok, #vst{since=LastSeq, acc=Acc2}} ->
+ Callback(stop, {LastSeq, Acc2});
+ {stop, #vst{since=LastSeq, acc=Acc2}} ->
+ Callback(stop, {LastSeq, Acc2})
+ end;
+ index_delete ->
+ Callback(stop, {Since, Acc})
+ after Timeout ->
+ TimeoutAcc2 = TimeoutAcc + Timeout,
+ case UserTimeout =< TimeoutAcc2 of
+ true ->
+ Callback(stop, {Since, Acc});
+ false when Heartbeat =:= true ->
+ case Callback(heartbeat, Acc) of
+ {ok, Acc2} ->
+ loop(State#vst{acc=Acc2, timeout_acc=TimeoutAcc2});
+ {stop, Acc2} ->
+ Callback(stop, {Since, Acc2})
+ end;
+ _ ->
+ Callback(stop, {Since, Acc})
+ end
+ end.
+
+changes_timeout(Options) ->
+ DefaultTimeout = list_to_integer(
+ couch_config:get("httpd", "changes_timeout", "60000")
+ ),
+ UserTimeout = proplists:get_value(timeout, Options, DefaultTimeout),
+ {Timeout, Heartbeat} = case proplists:get_value(heartbeat, Options) of
+ undefined -> {UserTimeout, false};
+ true ->
+ T = erlang:min(DefaultTimeout, UserTimeout),
+ {T, true};
+ H ->
+ T = erlang:min(H, UserTimeout),
+ {T, true}
+ end,
+ {UserTimeout, Timeout, Heartbeat}.
+
+view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View,
+ view_options=Options, since=Since,
+ callback=Callback, acc=UserAcc}=State) ->
+ Wrapper = fun ({{Seq, _Key, _DocId}, _Val}=KV, {Go, Acc2, OldSeq}) ->
+ LastSeq = if OldSeq < Seq -> Seq;
+ true -> OldSeq
+ end,
+
+ case Callback(KV, Acc2) of
+ {ok, Acc3} -> {ok, {Go, Acc3, LastSeq}};
+ {stop, Acc3} -> {stop, {stop, Acc3, LastSeq}}
+ end
+ end,
+
+ Acc0 = {ok, UserAcc, Since},
+ case couch_mrview:view_changes_since(DbName, DDocId, View, Since,
+ Wrapper, Options, Acc0) of
+ {ok, {Go, UserAcc2, Since2}}->
+ {Go, State#vst{since=Since2, acc=UserAcc2}};
+ Error ->
+ Error
+ end.
+
+index_update_notifier(#db{name=DbName}, DDocId) ->
+ index_update_notifier(DbName, DDocId);
+index_update_notifier(DbName, DDocId) ->
+ Self = self(),
+ {ok, NotifierPid} = couch_index_event:start_link(fun
+ ({index_update, {Name, Id, couch_mrview_index}})
+ when Name =:= DbName, Id =:= DDocId ->
+ Self ! index_update;
+ ({index_delete, {Name, Id, couch_mrview_index}})
+ when Name =:= DbName, Id =:= DDocId ->
+ Self ! index_delete;
+ (_) ->
+ ok
+ end),
+ NotifierPid.
http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/src/couch_mrview_test_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_test_util.erl b/src/couch_mrview_test_util.erl
index c68010c..1d3d788 100644
--- a/src/couch_mrview_test_util.erl
+++ b/src/couch_mrview_test_util.erl
@@ -33,6 +33,8 @@ new_db(Name, Type) ->
{ok, Db} = couch_db:create(Name, [?ADMIN_USER]),
save_docs(Db, [ddoc(Type)]).
+delete_db(Name) ->
+ couch_server:delete(Name, [{user_ctx, ?ADMIN}]).
save_docs(Db, Docs) ->
{ok, _} = couch_db:update_docs(Db, Docs, []),
http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/test/09-index-events.t
----------------------------------------------------------------------
diff --git a/test/09-index-events.t b/test/09-index-events.t
index 90654b8..1489e4e 100644
--- a/test/09-index-events.t
+++ b/test/09-index-events.t
@@ -15,7 +15,7 @@
% the License.
main(_) ->
- etap:plan(2),
+ etap:plan(4),
case (catch test()) of
ok ->
etap:end_tests();
@@ -30,6 +30,7 @@ test() ->
test_util:start_couch(),
{ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes),
test_update_event(Db),
+ test_delete_event(Db),
test_util:stop_couch(),
ok.
@@ -44,3 +45,17 @@ test_update_event(Db) ->
etap:is(Event, Expect, "index update events OK")
end,
couch_index_event:stop(Pid).
+
+test_delete_event(Db) ->
+ ok = couch_mrview:refresh(Db, <<"_design/bar">>),
+ {ok, Pid} = couch_index_event:start_link(self()),
+
+ etap:ok(is_pid(Pid), "event handler added"),
+ couch_mrview_test_util:delete_db(<<"foo">>),
+ Expect = {index_delete, {<<"foo">>, <<"_design/bar">>,
+ couch_mrview_index}},
+ receive
+ Event ->
+ etap:is(Event, Expect, "index delete events OK")
+ end,
+ couch_index_event:stop(Pid).
http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/1c24c425/test/10-index-changes.t
----------------------------------------------------------------------
diff --git a/test/10-index-changes.t b/test/10-index-changes.t
new file mode 100644
index 0000000..627376f
--- /dev/null
+++ b/test/10-index-changes.t
@@ -0,0 +1,194 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+%%! -pa ./deps/*/ebin -pa ./apps/*/ebin -pa ./test/etap
+
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+ etap:plan(6),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ timer:sleep(300),
+ ok.
+
+test() ->
+ test_util:start_couch(),
+ {ok, Db} = couch_mrview_test_util:init_db(<<"foo">>, changes),
+ test_normal_changes(Db),
+ test_stream_once(Db),
+ test_stream_once_since(Db),
+ test_stream_once_timeout(Db),
+ test_stream_once_heartbeat(Db),
+ test_stream(Db),
+ test_util:stop_couch(),
+ ok.
+
+test_normal_changes(Db) ->
+ Result = run_query(Db, []),
+ Expect = {ok, 11, [
+ {{2, 1, <<"1">>}, 1},
+ {{3, 10, <<"10">>}, 10},
+ {{4, 2, <<"2">>}, 2},
+ {{5, 3, <<"3">>}, 3},
+ {{6, 4, <<"4">>}, 4},
+ {{7, 5, <<"5">>}, 5},
+ {{8, 6, <<"6">>}, 6},
+ {{9, 7, <<"7">>}, 7},
+ {{10, 8, <<"8">>}, 8},
+ {{11, 9, <<"9">>}, 9}
+ ]},
+ etap:is(Result, Expect, "normal changes worked.").
+
+test_stream_once(Db) ->
+ Result = run_query(Db, [{stream, once}]),
+ Expect = {ok, 11, [
+ {{2, 1, <<"1">>}, 1},
+ {{3, 10, <<"10">>}, 10},
+ {{4, 2, <<"2">>}, 2},
+ {{5, 3, <<"3">>}, 3},
+ {{6, 4, <<"4">>}, 4},
+ {{7, 5, <<"5">>}, 5},
+ {{8, 6, <<"6">>}, 6},
+ {{9, 7, <<"7">>}, 7},
+ {{10, 8, <<"8">>}, 8},
+ {{11, 9, <<"9">>}, 9}
+ ]},
+ etap:is(Result, Expect, "stream once since 0 worked.").
+
+
+test_stream_once_since(Db) ->
+ Self = self(),
+ spawn(fun() ->
+ Result = run_query(Db, [{since, 11},
+ {stream, once}]),
+ Self ! {result, Result}
+ end),
+
+ spawn(fun() ->
+ timer:sleep(1000),
+ {ok, Db1} = save_doc(Db, 11),
+ couch_mrview:refresh(Db1, <<"_design/bar">>)
+ end),
+
+ Expect = {ok,12,[{{12,11,<<"11">>},11}]},
+
+ receive
+ {result, Result} ->
+ etap:is(Result, Expect, "normal changes worked.")
+ after 5000 ->
+ io:format("never got the change", [])
+ end.
+
+
+test_stream_once_timeout(Db) ->
+ Self = self(),
+ spawn(fun() ->
+ Result = run_query(Db, [{since, 12},
+ {stream, once},
+ {timeout, 3000}]),
+ Self ! {result, Result}
+ end),
+
+
+
+ Expect = {ok, 12, []},
+
+ receive
+ {result, Result} ->
+ etap:is(Result, Expect, "got timeout.")
+ after 5000 ->
+ io:format("never got the change", [])
+ end.
+
+test_stream_once_heartbeat(Db) ->
+ Self = self(),
+ spawn(fun() ->
+ Result = run_query(Db, [{since, 12},
+ {stream, once},
+ {heartbeat, 1000}]),
+ Self ! {result, Result}
+ end),
+
+ spawn(fun() ->
+ timer:sleep(3000),
+ {ok, Db1} = save_doc(Db, 12),
+ couch_mrview:refresh(Db1, <<"_design/bar">>)
+ end),
+
+ Expect = {ok,13,[heartbeat,
+ heartbeat,
+ heartbeat,
+ {{13,12,<<"12">>},12}]},
+
+
+
+ receive
+ {result, Result} ->
+ etap:is(Result, Expect, "heartbeat OK.")
+ after 5000 ->
+ io:format("never got the change", [])
+ end.
+
+
+test_stream(Db) ->
+ Self = self(),
+ spawn(fun() ->
+ Result = run_query(Db, [{since, 13},
+ stream,
+ {timeout, 3000}]),
+ Self ! {result, Result}
+ end),
+
+ spawn(fun() ->
+ timer:sleep(1000),
+ {ok, Db1} = save_doc(Db, 13),
+ couch_mrview:refresh(Db1, <<"_design/bar">>),
+ {ok, Db2} = save_doc(Db1, 14),
+ couch_mrview:refresh(Db2, <<"_design/bar">>)
+ end),
+
+ Expect = {ok, 15,[{{14,13,<<"13">>},13},
+ {{15,14,<<"14">>},14}]},
+
+ receive
+ {result, Result} ->
+ etap:is(Result, Expect, "stream OK.")
+ after 5000 ->
+ io:format("never got the change", [])
+ end.
+
+
+save_doc(Db, Id) ->
+ Doc = couch_mrview_test_util:doc(Id),
+ {ok, _Rev} = couch_db:update_doc(Db, Doc, []),
+ {ok, _} = couch_db:ensure_full_commit(Db),
+ couch_db:reopen(Db).
+
+run_query(Db, Opts) ->
+ Fun = fun
+ (stop, {LastSeq, Acc}) ->
+ {ok, LastSeq, Acc};
+ (heartbeat, Acc) ->
+ {ok, [heartbeat | Acc]};
+ (Event, Acc) ->
+ {ok, [Event | Acc]}
+ end,
+ couch_mrview:refresh(Db, <<"_design/bar">>),
+ {ok, LastSeq, R} = couch_mrview_changes:handle_changes(Db, <<"_design/bar">>,
+ <<"baz">>, Fun, [], Opts),
+ {ok, LastSeq, lists:reverse(R)}.