You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2014/10/31 20:53:33 UTC
[10/11] couch commit: updated refs/heads/master to 6125862
Change return format of _view_changes
This commit makes KV pairs for _view_changes requests be returned in the
format:
{..."add": [["key", "val"],["key2", "val2"]], "remove": ["oldkey"]}
Note that the "add" field is a list of lists rather than an object
because: 1) there can be multiple values for a given key and 2) keys can
be non-strings (non-string keys are invalid JSON).
Also note that if a view update causes adds or removes some (but not
all) values for a given key, all values associated with that key will be
returned in the _view_changes response.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/6125862c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/6125862c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/6125862c
Branch: refs/heads/master
Commit: 6125862cfed4e948f0f188faa46780dfb3bcbd0d
Parents: 6e7f19c
Author: Benjamin Bastian <be...@gmail.com>
Authored: Mon Sep 15 19:10:44 2014 -0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Fri Oct 31 12:43:53 2014 -0700
----------------------------------------------------------------------
src/couch_changes.erl | 154 ++++++++++++++++++++++++++++++++++++++-------
1 file changed, 131 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/6125862c/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index 2ff78e7..2b2647f 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -49,7 +49,9 @@
doc_options,
conflicts,
timeout,
- timeout_fun
+ timeout_fun,
+ aggregation_kvs,
+ aggregation_results
}).
handle_db_changes(Args, Req, Db) ->
@@ -450,7 +452,9 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D
timeout_fun = TimeoutFun,
ddoc_name = DDocName,
view_name = ViewName,
- view = View
+ view = View,
+ aggregation_results=[],
+ aggregation_kvs=[]
}.
send_changes(Acc, Dir, FirstRound) ->
@@ -460,16 +464,36 @@ send_changes(Acc, Dir, FirstRound) ->
filter = Filter,
view = View
} = Acc,
- EnumFun = fun changes_enumerator/2,
+ DbEnumFun = fun changes_enumerator/2,
case can_optimize(FirstRound, Filter) of
{true, Fun} ->
- Fun(Db, StartSeq, Dir, EnumFun, Acc, Filter);
+ Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
_ ->
- case View of
- undefined ->
- couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc);
- #mrview{} ->
- couch_mrview:view_changes_since(View, StartSeq, EnumFun, [{dir, Dir}], Acc)
+ case {View, Filter} of
+ {#mrview{}, {fast_view, _, _, _}} ->
+ couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ {undefined, _} ->
+ couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+ {#mrview{}, _} ->
+ ViewEnumFun = fun view_changes_enumerator/2,
+ {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
+ case Acc0 of
+ #changes_acc{aggregation_results=[]} ->
+ {Go, Acc0};
+ _ ->
+ #changes_acc{
+ aggregation_results = AggResults,
+ aggregation_kvs = AggKVs,
+ user_acc = UserAcc,
+ callback = Callback,
+ resp_type = ResponseType,
+ prepend = Prepend
+ } = Acc0,
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
+ UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc0#changes_acc{user_acc=UserAcc0}}
+ end
end
end.
@@ -589,18 +613,84 @@ maybe_refresh_view(Db, DDocName, ViewName) ->
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).
+view_changes_enumerator(Value, Acc) ->
+ #changes_acc{
+ filter = Filter, callback = Callback, prepend = Prepend,
+ user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+ timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
+ aggregation_kvs=AggKVs, aggregation_results=AggResults
+ } = Acc,
+
+ Results0 = view_filter(Db, Value, Filter),
+ Results = [Result || Result <- Results0, Result /= null],
+ {{Seq, _}, _} = Value,
+
+ Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+
+ if CurrentSeq =:= Seq ->
+ NewAggKVs = case Results of
+ [] -> AggKVs;
+ _ -> [Value|AggKVs]
+ end,
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ Acc0 = Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ aggregation_kvs=NewAggKVs
+ },
+ case Done of
+ stop -> {stop, Acc0};
+ ok -> {Go, Acc0}
+ end;
+ AggResults =/= [] ->
+ {NewAggKVs, NewAggResults} = case Results of
+ [] -> {[], []};
+ _ -> {[Value], Results}
+ end,
+ if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
+ aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
+ true ->
+ ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
+ limit = Limit - 1, aggregation_kvs=[Value],
+ aggregation_results=Results}}
+ end;
+ true ->
+ {NewAggKVs, NewAggResults} = case Results of
+ [] -> {[], []};
+ _ -> {[Value], Results}
+ end,
+ {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+ Acc0 = Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ aggregation_kvs=NewAggKVs,
+ aggregation_results=NewAggResults
+ },
+ case Done of
+ stop -> {stop, Acc0};
+ ok -> {Go, Acc0}
+ end
+ end.
+
changes_enumerator(Value0, Acc) ->
#changes_acc{
filter = Filter, callback = Callback, prepend = Prepend,
user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
- timeout = Timeout, timeout_fun = TimeoutFun, view = View
+ timeout = Timeout, timeout_fun = TimeoutFun
} = Acc,
- {Value, Results0} = case {View, Filter} of
- {_, {fast_view, _, _, _}} ->
+ {Value, Results0} = case Filter of
+ {fast_view, _, _, _} ->
fast_view_filter(Db, Value0, Filter);
- {#mrview{}, _} ->
- {Value0, view_filter(Db, Value0, Filter)};
- {_, _} ->
+ _ ->
{Value0, filter(Db, Value0, Filter)}
end,
Results = [Result || Result <- Results0, Result /= null],
@@ -638,15 +728,33 @@ changes_enumerator(Value0, Acc) ->
-changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) ->
- format_doc_info_change(Results, DocInfo, Acc);
-changes_row(Results, KV, #changes_acc{view=#mrview{}}=Acc) ->
- {{Seq, Key}, {Id, Value, Rev}} = KV,
- {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}] ++ maybe_get_changes_doc({Id, Rev}, Acc)};
-changes_row(Results, #doc_info{}=DocInfo, Acc) ->
- format_doc_info_change(Results, DocInfo, Acc).
+view_changes_row(Results, KVs, Acc) ->
+ {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
+ {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
+ case Value of
+ removed ->
+ {AddAcc, [Key|RemAcc]};
+ {dups, DupValues} ->
+ AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
+ [[Key, DupValue]|AddAcc0]
+ end, AddAcc, DupValues),
+ {AddAcc1, RemAcc};
+ _ ->
+ {[[Key, Value]|AddAcc], RemAcc}
+ end
+ end, {[], []}, KVs),
+
+ % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
+ % by seq.
+ [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
+
+ {[
+ {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
+ {<<"remove">>, Remove}, {<<"changes">>, Results}
+ ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+
-format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) ->
+changes_row(Results, DocInfo, Acc) ->
#doc_info{
id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
} = DocInfo,