You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2014/10/31 20:53:53 UTC

[38/41] couch-mrview commit: updated refs/heads/master to 28e51f3

Fix seqs in mrview updater

This commit makes it so the correct seqs are inserted into the mrview
sequence tree. It also fixes the btree comparison function for that
tree.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/commit/eb05dd78
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/tree/eb05dd78
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/diff/eb05dd78

Branch: refs/heads/master
Commit: eb05dd782437eaea32f198d78ae10bbccae2edd4
Parents: 11d4c17
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Sep 19 03:41:56 2014 -0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 19:28:42 2014 -0700

----------------------------------------------------------------------
 src/couch_mrview_updater.erl | 119 +++++++++++++++++++-------------------
 src/couch_mrview_util.erl    |   3 +-
 2 files changed, 60 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/eb05dd78/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_updater.erl b/src/couch_mrview_updater.erl
index c98c2f5..db5b5e9 100644
--- a/src/couch_mrview_updater.erl
+++ b/src/couch_mrview_updater.erl
@@ -206,8 +206,8 @@ write_results(Parent, State) ->
     case accumulate_writes(State, State#mrst.write_queue, nil) of
         stop ->
             Parent ! {new_state, State};
-        {Go, {Seq, ViewKVs, DocIdKeys, Log}} ->
-            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Log),
+        {Go, {Seq, ViewKVs, DocIdKeys, Seqs, Log}} ->
+            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Seqs, Log),
             if Go == stop ->
                 Parent ! {new_state, NewState};
             true ->
@@ -229,17 +229,17 @@ start_query_server(State) ->
 
 
 accumulate_writes(State, W, Acc0) ->
-    {Seq, ViewKVs, DocIdKVs, Log} = case Acc0 of
-        nil -> {0, [{V#mrview.id_num, {[], []}} || V <- State#mrst.views], [], dict:new()};
+    {Seq, ViewKVs, DocIdKVs, Seqs, Log} = case Acc0 of
+        nil -> {0, [{V#mrview.id_num, {[], []}} || V <- State#mrst.views], [], dict:new(), dict:new()};
         _ -> Acc0
     end,
     case couch_work_queue:dequeue(W) of
         closed when Seq == 0 ->
             stop;
         closed ->
-            {stop, {Seq, ViewKVs, DocIdKVs, Log}};
+            {stop, {Seq, ViewKVs, DocIdKVs, Seqs, Log}};
         {ok, Info} ->
-            {_, _, NewIds, _} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs, Log),
+            {_, _, NewIds, _, _} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs, Seqs, Log),
             case accumulate_more(length(NewIds)) of
                 true -> accumulate_writes(State, W, Acc);
                 false -> {ok, Acc}
@@ -256,29 +256,27 @@ accumulate_more(NumDocIds) ->
         andalso CurrMem < list_to_integer(MinSize).
 
 
-merge_results([], SeqAcc, ViewKVs, DocIdKeys, Log) ->
-    {SeqAcc, ViewKVs, DocIdKeys, Log};
-merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys, Log) ->
-    Fun = fun(RawResults, {VKV, DIK, Log2}) ->
-        merge_results(RawResults, VKV, DIK, Log2)
+merge_results([], SeqAcc, ViewKVs, DocIdKeys, Seqs, Log) ->
+    {SeqAcc, ViewKVs, DocIdKeys, Seqs, Log};
+merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys, Seqs, Log) ->
+    Fun = fun(RawResults, {VKV, DIK, Seqs2, Log2}) ->
+        merge_results(RawResults, VKV, DIK, Seqs2, Log2)
     end,
-    {ViewKVs1, DocIdKeys1, Log1} = lists:foldl(Fun, {ViewKVs, DocIdKeys, Log},
-                                               Results),
-    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1,
-                  Log1).
+    {ViewKVs1, DocIdKeys1, Seqs1, Log1} = lists:foldl(Fun, {ViewKVs, DocIdKeys, Seqs, Log}, Results),
+    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1, Seqs1, Log1).
 
 
-merge_results({DocId, _Seq, Rev, []}, ViewKVs, DocIdKeys, Log) ->
-    {ViewKVs, [{DocId, []} | DocIdKeys], dict:store({DocId, Rev}, [], Log)};
-merge_results({DocId, Seq, Rev, RawResults}, ViewKVs, DocIdKeys, Log) ->
+merge_results({DocId, Seq, Rev, []}, ViewKVs, DocIdKeys, Seqs, Log) ->
+    {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, Seq, Seqs), dict:store({DocId, Rev}, [], Log)};
+merge_results({DocId, Seq, Rev, RawResults}, ViewKVs, DocIdKeys, Seqs, Log) ->
     JsonResults = couch_query_servers:raw_to_ejson(RawResults),
     Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
     case lists:flatten(Results) of
         [] ->
-            {ViewKVs, [{DocId, []} | DocIdKeys], dict:store({DocId, Rev}, [], Log)};
+            {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, Seq, Seqs), dict:store({DocId, Rev}, [], Log)};
         _ ->
             {ViewKVs1, ViewIdKeys, Log1} = insert_results(DocId, Seq, Rev, Results, ViewKVs, [], [], Log),
-            {ViewKVs1, [ViewIdKeys | DocIdKeys], Log1}
+            {ViewKVs1, [ViewIdKeys | DocIdKeys], dict:store(DocId, Seq, Seqs), Log1}
     end.
 
 
@@ -305,19 +303,25 @@ insert_results(DocId, Seq, Rev, [KVs | RKVs], [{Id, {VKVs, SKVs}} | RVKVs], VKVA
                   [{Id, {FinalKVs, FinalSKVs}} | VKVAcc], VIdKeys0, Log1).
 
 
-write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Log) ->
+write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
     #mrst{
         id_btree=IdBtree,
         log_btree=LogBtree,
         first_build=FirstBuild
     } = State,
 
+    Revs = dict:from_list(dict:fetch_keys(Log0)),
+
+    Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) ->
+        dict:store(Id, DIKeys, Acc)
+    end, dict:new(), Log0),
+
     {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
     ToRemByView = collapse_rem_keys(ToRemove, dict:new()),
 
     {ok, SeqsToAdd, SeqsToRemove, LogBtree2} = case LogBtree of
         nil -> {ok, undefined, undefined, nil};
-        _ -> update_log(LogBtree, Log, UpdateSeq, FirstBuild)
+        _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
     end,
 
     UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
@@ -336,12 +340,13 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Log) ->
             SKVs1 = SKVs ++ SToAdd,
 
             {ok, SeqBtree2} = if SIndexed ->
-                RemSKs = [{Seq, Key} || {Key, Seq, _} <- SToRem],
+                RemSKs = lists:sort([{Seq, Key} || {Key, Seq, _} <- SToRem]),
                 couch_btree:add_remove(View#mrview.seq_btree,
                                        SKVs1, RemSKs);
             true ->
                 {ok, nil}
             end,
+
             {ok, KeyBySeqBtree2} = if KSIndexed ->
                 RemKSs = [{[Key, Seq], DocId} || {Key, Seq, DocId} <- SToRem],
                 couch_btree:add_remove(View#mrview.key_byseq_btree,
@@ -367,7 +372,6 @@ write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Log) ->
         log_btree=LogBtree2
     }.
 
-
 update_id_btree(Btree, DocIdKeys, true) ->
     ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
     couch_btree:query_modify(Btree, [], ToAdd, []);
@@ -377,64 +381,59 @@ update_id_btree(Btree, DocIdKeys, _) ->
     ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
     couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).
 
-update_log(Btree, Log, _UpdatedSeq, true) ->
-    ToAdd = lists:flatmap(fun({{Id, _Rev}, Keys}) ->
-        case Keys of
-            [] ->
-                [];
-            _ ->
-                [{Id, Keys}]
-        end
-    end, dict:to_list(Log)),
+
+update_log(Btree, Log, _Revs, _Seqs, true) ->
+    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log),
+                             DIKeys /= []],
     {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []),
     {ok, dict:new(), dict:new(), LogBtree2};
-update_log(Btree, Log, UpdatedSeq, _) ->
+update_log(Btree, Log, Revs, Seqs, _) ->
     %% build list of updated keys and Id
-    Revs = dict:from_list(dict:fetch_keys(Log)),
-    Log0 = dict:fold(fun({Id, _Rev}, DIKeys, Acc) ->
-        dict:store(Id, DIKeys, Acc)
-    end, dict:new(), Log),
-    {ToLook, Updated, Removed} = dict:fold(fun(Id, DIKeys, {IdsAcc, KeysAcc, RemAcc}) ->
-        {KeysAcc1, RemAcc1} = lists:foldl(fun({ViewId, {Key, _Seq, Op}}, Acc) ->
-            {KeysAcc2, RemAcc2} = Acc,
-            case Op of
-                add ->
-                    {[{Id, ViewId, Key} | KeysAcc2], RemAcc2};
-                del ->
-                    {KeysAcc2, [{Id, ViewId, Key} | RemAcc2]}
-            end
-        end, {KeysAcc, RemAcc}, DIKeys),
-        {[Id | IdsAcc], KeysAcc1, RemAcc1}
-    end, {[], [], []}, Log0),
+    {ToLook, Updated, Removed} = dict:fold(
+        fun(Id, [], {IdsAcc, KeysAcc, RemAcc}) ->
+            {[Id | IdsAcc], KeysAcc, RemAcc};
+        (Id, DIKeys, {IdsAcc, KeysAcc, RemAcc}) ->
+            {KeysAcc1, RemAcc1} = lists:foldl(fun({ViewId, {Key, _Seq, Op}}, {KeysAcc2, RemAcc2}) ->
+                case Op of
+                    add -> {[{Id, ViewId, Key}|KeysAcc2], RemAcc2};
+                    del -> {KeysAcc2, [{Id, ViewId, Key}|RemAcc2]}
+                end
+            end, {KeysAcc, RemAcc}, DIKeys),
+            {[Id | IdsAcc], KeysAcc1, RemAcc1}
+        end, {[], [], []}, Log),
 
     MapFun = fun({ok, KV}) -> [KV]; (not_found) -> [] end,
     KVsToLook = lists:flatmap(MapFun, couch_btree:lookup(Btree, ToLook)),
+
     {Log1, AddAcc, DelAcc} = lists:foldl(fun({DocId, VIdKeys}, Acc) ->
-        lists:foldl(fun({ViewId, {Key, Seq, _Op}}, {Log4, AddAcc4, DelAcc4}) ->
-            case lists:member({DocId, ViewId, Key}, Updated) of
+        lists:foldl(fun({ViewId, {Key, OldSeq, _Op}}, {Log4, AddAcc4, DelAcc4}) ->
+
+            IsUpdated = lists:member({DocId, ViewId, Key}, Updated),
+            IsRemoved = lists:member({DocId, ViewId, Key}, Removed),
+
+            case IsUpdated of
                 true ->
                     % the log is updated, deleted old record from the view
-                    DelAcc5 = dict:append(ViewId, {Key, Seq, DocId}, DelAcc4),
+                    DelAcc5 = dict:append(ViewId, {Key, OldSeq, DocId}, DelAcc4),
                     {Log4, AddAcc4, DelAcc5};
                 false ->
                     % an update operation has been logged for this key. We must
                     % now record it as deleted in the log, remove the old record
                     % in the view and update the view with a removed record.
-                    Log5 = case lists:member({DocId, ViewId, Key}, Removed) of
+                    NewSeq = dict:fetch(DocId, Seqs),
+                    Log5 = case IsRemoved of
                         false ->
-                            LogValue = {ViewId, {Key, UpdatedSeq, del}},
-                            dict:append(DocId, LogValue, Log4);
+                            dict:append(DocId, {ViewId, {Key, NewSeq, del}}, Log4);
                         true ->
                             Log4
                     end,
-                    DelAcc5 = dict:append(ViewId, {Key, Seq, DocId}, DelAcc4),
                     Rev = dict:fetch(DocId, Revs),
-                    AddValue = {{UpdatedSeq, Key}, {DocId, ?REM_VAL, Rev}},
-                    AddAcc5 = dict:append(ViewId, AddValue, AddAcc4),
+                    DelAcc5 = dict:append(ViewId, {Key, OldSeq, DocId}, DelAcc4),
+                    AddAcc5 = dict:append(ViewId, {{NewSeq, Key}, {DocId, ?REM_VAL, Rev}}, AddAcc4),
                     {Log5, AddAcc5, DelAcc5}
             end
         end, Acc, VIdKeys)
-    end, {Log0, dict:new(), dict:new()}, KVsToLook),
+    end, {Log, dict:new(), dict:new()}, KVsToLook),
 
     ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log1), DIKeys /= []],
     % store the new logs

http://git-wip-us.apache.org/repos/asf/couchdb-couch-mrview/blob/eb05dd78/src/couch_mrview_util.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview_util.erl b/src/couch_mrview_util.erl
index 6054e3a..9669736 100644
--- a/src/couch_mrview_util.erl
+++ b/src/couch_mrview_util.erl
@@ -289,8 +289,7 @@ open_view(Db, Fd, Lang, {BTState, SeqBTState, KSeqBTState, USeq, PSeq}, View) ->
 
     BySeqReduceFun = fun couch_db_updater:btree_by_seq_reduce/2,
     {ok, SeqBtree} = if View#mrview.seq_indexed ->
-        ViewSeqBtOpts = [{less, fun less_json_seqs/2},
-                         {reduce, BySeqReduceFun},
+        ViewSeqBtOpts = [{reduce, BySeqReduceFun},
                          {compression, couch_db:compression(Db)}],
 
         couch_btree:open(SeqBTState, Fd, ViewSeqBtOpts);