You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2014/10/31 20:53:33 UTC

[10/11] couch commit: updated refs/heads/master to 6125862

Change return format of _view_changes

This commit makes KV pairs for _view_changes requests be returned in the
format:

    {..."add": [["key", "val"],["key2", "val2"]], "remove": ["oldkey"]}

Note that the "add" field is a list of lists rather than an object
because: 1) there can be multiple values for a given key and 2) keys can
be non-strings (non-string keys are invalid JSON).

Also note that if a view update causes adds or removes some (but not
all) values for a given key, all values associated with that key will be
returned in the _view_changes response.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/6125862c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/6125862c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/6125862c

Branch: refs/heads/master
Commit: 6125862cfed4e948f0f188faa46780dfb3bcbd0d
Parents: 6e7f19c
Author: Benjamin Bastian <be...@gmail.com>
Authored: Mon Sep 15 19:10:44 2014 -0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Fri Oct 31 12:43:53 2014 -0700

----------------------------------------------------------------------
 src/couch_changes.erl | 154 ++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 131 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/6125862c/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index 2ff78e7..2b2647f 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -49,7 +49,9 @@
     doc_options,
     conflicts,
     timeout,
-    timeout_fun
+    timeout_fun,
+    aggregation_kvs,
+    aggregation_results
 }).
 
 handle_db_changes(Args, Req, Db) ->
@@ -450,7 +452,9 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D
         timeout_fun = TimeoutFun,
         ddoc_name = DDocName,
         view_name = ViewName,
-        view = View
+        view = View,
+        aggregation_results=[],
+        aggregation_kvs=[]
     }.
 
 send_changes(Acc, Dir, FirstRound) ->
@@ -460,16 +464,36 @@ send_changes(Acc, Dir, FirstRound) ->
         filter = Filter,
         view = View
     } = Acc,
-    EnumFun = fun changes_enumerator/2,
+    DbEnumFun = fun changes_enumerator/2,
     case can_optimize(FirstRound, Filter) of
         {true, Fun} ->
-            Fun(Db, StartSeq, Dir, EnumFun, Acc, Filter);
+            Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
         _ ->
-            case View of
-                undefined ->
-                    couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc);
-                #mrview{} ->
-                    couch_mrview:view_changes_since(View, StartSeq, EnumFun, [{dir, Dir}], Acc)
+            case {View, Filter}  of
+                {#mrview{}, {fast_view, _, _, _}} ->
+                    couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+                {undefined, _} ->
+                    couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
+                {#mrview{}, _} ->
+                    ViewEnumFun = fun view_changes_enumerator/2,
+                    {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
+                    case Acc0 of
+                        #changes_acc{aggregation_results=[]} ->
+                            {Go, Acc0};
+                        _ ->
+                            #changes_acc{
+                                aggregation_results = AggResults,
+                                aggregation_kvs = AggKVs,
+                                user_acc = UserAcc,
+                                callback = Callback,
+                                resp_type = ResponseType,
+                                prepend = Prepend
+                            } = Acc0,
+                            ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
+                            UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+                            reset_heartbeat(),
+                            {Go, Acc0#changes_acc{user_acc=UserAcc0}}
+                    end
             end
     end.
 
@@ -589,18 +613,84 @@ maybe_refresh_view(Db, DDocName, ViewName) ->
 end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
     Callback({stop, EndSeq}, ResponseType, UserAcc).
 
+view_changes_enumerator(Value, Acc) ->
+    #changes_acc{
+        filter = Filter, callback = Callback, prepend = Prepend,
+        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
+        timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
+        aggregation_kvs=AggKVs, aggregation_results=AggResults
+    } = Acc,
+
+    Results0 = view_filter(Db, Value, Filter),
+    Results = [Result || Result <- Results0, Result /= null],
+    {{Seq, _}, _} = Value,
+
+    Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
+
+    if CurrentSeq =:= Seq ->
+        NewAggKVs = case Results of
+            [] -> AggKVs;
+            _ -> [Value|AggKVs]
+        end,
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        Acc0 = Acc#changes_acc{
+            seq = Seq,
+            user_acc = UserAcc2,
+            aggregation_kvs=NewAggKVs
+        },
+        case Done of
+            stop -> {stop, Acc0};
+            ok -> {Go, Acc0}
+        end;
+    AggResults =/= [] ->
+        {NewAggKVs, NewAggResults} = case Results of
+            [] -> {[], []};
+            _ -> {[Value], Results}
+        end,
+        if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+            ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+            UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{
+                seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
+                aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
+        true ->
+            ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
+            UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{
+                seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
+                limit = Limit - 1, aggregation_kvs=[Value],
+                aggregation_results=Results}}
+        end;
+    true ->
+        {NewAggKVs, NewAggResults} = case Results of
+            [] -> {[], []};
+            _ -> {[Value], Results}
+        end,
+        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
+        Acc0 = Acc#changes_acc{
+            seq = Seq,
+            user_acc = UserAcc2,
+            aggregation_kvs=NewAggKVs,
+            aggregation_results=NewAggResults
+        },
+        case Done of
+            stop -> {stop, Acc0};
+            ok -> {Go, Acc0}
+        end
+    end.
+
 changes_enumerator(Value0, Acc) ->
     #changes_acc{
         filter = Filter, callback = Callback, prepend = Prepend,
         user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
-        timeout = Timeout, timeout_fun = TimeoutFun, view = View
+        timeout = Timeout, timeout_fun = TimeoutFun
     } = Acc,
-    {Value, Results0} = case {View, Filter} of
-        {_, {fast_view, _, _, _}} ->
+    {Value, Results0} = case Filter of
+        {fast_view, _, _, _} ->
             fast_view_filter(Db, Value0, Filter);
-        {#mrview{}, _} ->
-            {Value0, view_filter(Db, Value0, Filter)};
-        {_, _} ->
+        _ ->
             {Value0, filter(Db, Value0, Filter)}
     end,
     Results = [Result || Result <- Results0, Result /= null],
@@ -638,15 +728,33 @@ changes_enumerator(Value0, Acc) ->
 
 
 
-changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) ->
-    format_doc_info_change(Results, DocInfo, Acc);
-changes_row(Results, KV, #changes_acc{view=#mrview{}}=Acc) ->
-    {{Seq, Key}, {Id, Value, Rev}} = KV,
-    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}] ++ maybe_get_changes_doc({Id, Rev}, Acc)};
-changes_row(Results, #doc_info{}=DocInfo, Acc) ->
-    format_doc_info_change(Results, DocInfo, Acc).
+view_changes_row(Results, KVs, Acc) ->
+    {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
+        {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
+        case Value of
+            removed ->
+                {AddAcc, [Key|RemAcc]};
+            {dups, DupValues} ->
+                AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
+                    [[Key, DupValue]|AddAcc0]
+                end, AddAcc, DupValues),
+                {AddAcc1, RemAcc};
+            _ ->
+                {[[Key, Value]|AddAcc], RemAcc}
+        end
+    end, {[], []}, KVs),
+
+    % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
+    % by seq.
+    [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
+
+    {[
+        {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
+        {<<"remove">>, Remove}, {<<"changes">>, Results}
+    ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
+
 
-format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) ->
+changes_row(Results, DocInfo, Acc) ->
     #doc_info{
         id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
     } = DocInfo,