[couchdb] 03/05: Improve smoosh_priority_queue

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch main
in repository

commit de05ea9f624d28ae99be9dba793f2e614c6f3658
Author: Nick Vatamaniuc <>
AuthorDate: Fri Nov 4 14:23:47 2022 -0400

    Improve smoosh_priority_queue
      * Remove `Value` parameter as it was never used.
      * Use `make_ref()` instead of `unique_integer([monotonic])` for performance [1].
      * Rewrite tests to get 100% coverage. Previous tests didn't actually run due
        to a setup error.
      * Switch from `size/1` to `qsize/1` as `size/1` is a built-in and we have to
        write `?MODULE:size/1` everywhere.
      * Do not remove elements from the gb_tree just for inspecting min and max
      * Remove `last_updated/2` logic, as that will be published in an ETS table.
      * Replace low level file serialization operations with `to_map/1`, `from_map/3`.
    4> timer:tc(fun() -> [make_ref() || _ <- lists:seq(1, 1000000)], ok end ).
    6> timer:tc(fun() -> [{erlang:monotonic_time(), erlang:unique_integer([monotonic])} || _ <- lists:seq(1, 1000000)], ok end ).
 src/smoosh/src/smoosh_priority_queue.erl        | 281 +++++++++++++-----------
 src/smoosh/test/smoosh_priority_queue_tests.erl | 167 --------------
 2 files changed, 154 insertions(+), 294 deletions(-)

diff --git a/src/smoosh/src/smoosh_priority_queue.erl b/src/smoosh/src/smoosh_priority_queue.erl
index 16deb0f76..b2ef4393d 100644
--- a/src/smoosh/src/smoosh_priority_queue.erl
+++ b/src/smoosh/src/smoosh_priority_queue.erl
@@ -12,19 +12,16 @@
--export([new/1, recover/1]).
--export([last_updated/2, is_key/2, in/4, in/5, out/1, size/1, info/1]).
--export([from_list/2, to_list/1]).
--define(VSN, 1).
+    new/1,
+    name/1,
+    in/4,
+    out/1,
+    info/1,
+    flush/1,
+    to_map/1,
+    from_map/3
 -record(priority_queue, {
@@ -33,145 +30,175 @@
 new(Name) ->
-    #priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
-recover(#priority_queue{name = Name, map = Map0} = Q) ->
-    case do_recover(file_name(Q)) of
-        {ok, Terms} ->
-            Map = maps:merge(Map0, Terms),
-            Tree = maps:fold(
-                fun(Key, {TreeKey, Value}, TreeAcc) ->
-                    gb_trees:enter(TreeKey, {Key, Value}, TreeAcc)
-                end,
-                gb_trees:empty(),
-                Map
-            ),
-            #priority_queue{name = Name, map = Map, tree = Tree};
-        error ->
-            Q
-    end.
+    #priority_queue{name = Name, map = #{}, tree = gb_trees:empty()}.
-write_to_file(#priority_queue{map = Map} = Q) ->
-    smoosh_utils:write_to_file(Map, file_name(Q), ?VSN).
+name(#priority_queue{name = Name}) ->
+    Name.
-flush(#priority_queue{name = Name} = Q) ->
-    Q#priority_queue{name = Name, map = maps:new(), tree = gb_trees:empty()}.
+flush(#priority_queue{} = Q) ->
+    Q#priority_queue{map = #{}, tree = gb_trees:empty()}.
-last_updated(Key, #priority_queue{map = Map}) ->
+in(Key, Priority, Capacity, #priority_queue{map = Map, tree = Tree} = Q) ->
     case maps:find(Key, Map) of
-        {ok, {_Priority, {LastUpdatedMTime, _MInt}}} ->
-            LastUpdatedMTime;
+        {ok, {Priority, _}} ->
+            % Priority matches, keep everything as is. This might be the case
+            % for upgrade channels, for instance, where priority is 1.
+            Q;
+        {ok, TreeKey} ->
+            Tree1 = gb_trees:delete(TreeKey, Tree),
+            insert(Key, Priority, Capacity, Q#priority_queue{tree = Tree1});
         error ->
-            false
+            insert(Key, Priority, Capacity, Q)
-is_key(Key, #priority_queue{map = Map}) ->
-    maps:is_key(Key, Map).
-in(Key, Value, Priority, Q) ->
-    in(Key, Value, Priority, infinity, Q).
-in(Key, Value, Priority, Capacity, #priority_queue{name = Name, map = Map, tree = Tree}) ->
-    Tree1 =
-        case maps:find(Key, Map) of
-            {ok, TreeKey} ->
-                gb_trees:delete_any(TreeKey, Tree);
-            error ->
-                Tree
-        end,
-    Now = {erlang:monotonic_time(), erlang:unique_integer([monotonic])},
-    TreeKey1 = {Priority, Now},
-    Tree2 = gb_trees:enter(TreeKey1, {Key, Value}, Tree1),
-    Map1 = maps:put(Key, TreeKey1, Map),
-    truncate(Capacity, #priority_queue{name = Name, map = Map1, tree = Tree2}).
-out(#priority_queue{name = Name, map = Map, tree = Tree}) ->
+out(#priority_queue{map = Map, tree = Tree} = Q) ->
     case gb_trees:is_empty(Tree) of
         true ->
         false ->
-            {_, {Key, Value}, Tree1} = gb_trees:take_largest(Tree),
-            Map1 = maps:remove(Key, Map),
-            Q = #priority_queue{name = Name, map = Map1, tree = Tree1},
-            {Key, Value, Q}
+            {_, Key, Tree1} = gb_trees:take_largest(Tree),
+            {Key, Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1}}
-size(#priority_queue{tree = Tree}) ->
+qsize(#priority_queue{tree = Tree}) ->
 info(#priority_queue{tree = Tree} = Q) ->
-        {size, ?MODULE:size(Q)}
+        {size, qsize(Q)}
         | case gb_trees:is_empty(Tree) of
             true ->
             false ->
-                {Min, _, _} = gb_trees:take_smallest(Tree),
-                {Max, _, _} = gb_trees:take_largest(Tree),
+                {{Min, _}, _} = gb_trees:smallest(Tree),
+                {{Max, _}, _} = gb_trees:largest(Tree),
                 [{min, Min}, {max, Max}]
-from_list(Orddict, #priority_queue{name = Name}) ->
-    Map = maps:from_list(Orddict),
-    Tree = gb_trees:from_orddict(Orddict),
-    #priority_queue{name = Name, map = Map, tree = Tree}.
-to_list(#priority_queue{tree = Tree}) ->
-    gb_trees:to_list(Tree).
-is_empty(#priority_queue{tree = Tree}) ->
-    gb_trees:is_empty(Tree).
-file_name(#priority_queue{name = Name}) ->
-    filename:join(config:get("smoosh", "state_dir", "."), Name ++ ".waiting").
+insert(Key, Priority, Capacity, #priority_queue{tree = Tree, map = Map} = Q) ->
+    TreeKey = {Priority, make_ref()},
+    Tree1 = gb_trees:insert(TreeKey, Key, Tree),
+    truncate(Capacity, Q#priority_queue{map = Map#{Key => TreeKey}, tree = Tree1}).
-truncate(infinity, Q) ->
-    Q;
-truncate(Capacity, Q) when Capacity > 0 ->
-    truncate(Capacity, ?MODULE:size(Q), Q).
+truncate(Capacity, Q) when is_integer(Capacity), Capacity > 0 ->
+    truncate(Capacity, qsize(Q), Q).
-truncate(Capacity, Size, Q) when Size =< Capacity ->
+truncate(Capacity, Size, Q) when is_integer(Capacity), Size =< Capacity ->
-truncate(Capacity, Size, #priority_queue{name = Name, map = Map, tree = Tree}) when Size > 0 ->
-    {_, {Key, _}, Tree1} = gb_trees:take_smallest(Tree),
-    Q1 = #priority_queue{name = Name, map = maps:remove(Key, Map), tree = Tree1},
-    truncate(Capacity, ?MODULE:size(Q1), Q1).
-do_recover(FilePath) ->
-    case file:read_file(FilePath) of
-        {ok, Content} ->
-            <<Vsn, Binary/binary>> = Content,
-            try parse_queue(Vsn, ?VSN, Binary) of
-                Bin ->
-                    Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-                    couch_log:Level(
-                        "~p Successfully restored state file ~s", [?MODULE, FilePath]
-                    ),
-                    {ok, Bin}
-            catch
-                error:Reason ->
-                    couch_log:error(
-                        "~p Invalid queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
-                    ),
-                    file:delete(FilePath),
-                    error
-            end;
-        {error, enoent} ->
-            Level = smoosh_utils:log_level("compaction_log_level", "debug"),
-            couch_log:Level(
-                "~p (~p) Queue file ~s does not exist. Not restoring.", [?MODULE, enoent, FilePath]
-            ),
-            error;
-        {error, Reason} ->
-            couch_log:error(
-                "~p Cannot read the queue file (~p). Deleting ~s", [?MODULE, Reason, FilePath]
-            ),
-            file:delete(FilePath),
-            error
-    end.
-parse_queue(1, ?VSN, Binary) ->
-    erlang:binary_to_term(Binary, [safe]);
-parse_queue(Vsn, ?VSN, _) ->
-    error({unsupported_version, Vsn}).
+truncate(Capacity, Size, Q) when is_integer(Capacity), Size > 0 ->
+    #priority_queue{map = Map, tree = Tree} = Q,
+    {_, Key, Tree1} = gb_trees:take_smallest(Tree),
+    Q1 = Q#priority_queue{map = maps:remove(Key, Map), tree = Tree1},
+    truncate(Capacity, qsize(Q1), Q1).
+% Serialize the queue to/from simple maps which look like #{Key => Priority}.
+% The intent is for these to be used by the smoosh persistence facility.
+to_map(#priority_queue{map = Map}) ->
+    Fun = fun(_Key, {Priority, _Ref}) ->
+        Priority
+    end,
+    maps:map(Fun, Map).
+from_map(Name, Capacity, #{} = SerializedMap) ->
+    Fun = fun(Key, Priority, Acc) ->
+        insert(Key, Priority, Capacity, Acc)
+    end,
+    maps:fold(Fun, new(Name), SerializedMap).
+-define(K1, <<"db1">>).
+-define(K2, {<<"db1">>, <<"design/_doc1">>}).
+-define(K3, {index_cleanup, <<"db1">>}).
+-define(P1, 1).
+-define(P2, 2.4).
+-define(P3, infinity).
+basics_test() ->
+    Q = new("foo"),
+    ?assertMatch(#priority_queue{}, Q),
+    ?assertEqual("foo", name(Q)),
+    ?assertEqual([{size, 0}], info(Q)).
+empty_test() ->
+    Q = new("foo"),
+    ?assertEqual(false, out(Q)),
+    ?assertEqual(Q, truncate(1, Q)),
+    ?assertEqual(Q, flush(Q)),
+    ?assertEqual(#{}, to_map(Q)),
+    ?assertEqual(Q, from_map("foo", 1, #{})).
+one_element_test() ->
+    Q0 = new("foo"),
+    Q = in(?K1, ?P1, 1, Q0),
+    ?assertMatch(#priority_queue{}, Q),
+    ?assertEqual([{size, 1}, {min, 1}, {max, 1}], info(Q)),
+    ?assertEqual(Q, truncate(1, Q)),
+    ?assertMatch({?K1, #priority_queue{}}, out(Q)),
+    {?K1, Q2} = out(Q),
+    ?assertEqual(Q2, Q0),
+    ?assertEqual(#{?K1 => ?P1}, to_map(Q)),
+    Q3 = from_map("foo", 1, to_map(Q)),
+    ?assertEqual("foo", name(Q3)),
+    ?assertEqual([{size, 1}, {min, ?P1}, {max, ?P1}], info(Q3)),
+    ?assertEqual(to_map(Q), to_map(Q3)),
+    ?assertEqual(Q0, flush(Q)).
+multiple_elements_basics_test() ->
+    Q0 = new("foo"),
+    Q1 = in(?K1, ?P1, 10, Q0),
+    Q2 = in(?K2, ?P2, 10, Q1),
+    Q3 = in(?K3, ?P3, 10, Q2),
+    ?assertEqual([{size, 3}, {min, ?P1}, {max, ?P3}], info(Q3)),
+    ?assertEqual([?K3, ?K2, ?K1], drain(Q3)).
+update_element_same_priority_test() ->
+    Q0 = new("foo"),
+    Q1 = in(?K1, ?P1, 10, Q0),
+    ?assertEqual(Q1, in(?K1, ?P1, 10, Q1)).
+update_element_new_priority_test() ->
+    Q0 = new("foo"),
+    Q1 = in(?K1, ?P1, 10, Q0),
+    Q2 = in(?K2, ?P2, 10, Q1),
+    Q3 = in(?K1, ?P3, 10, Q2),
+    ?assertEqual([{size, 2}, {min, ?P2}, {max, ?P3}], info(Q3)),
+    ?assertEqual([?K1, ?K2], drain(Q3)).
+capacity_test() ->
+    Q0 = new("foo"),
+    Q1 = in(?K1, ?P1, 2, Q0),
+    % Capacity = 1, one element only remains
+    ?assertEqual([?K2], drain(in(?K2, ?P2, 1, Q1))),
+    % Capacity = 2, only top two elements remain
+    Q2 = in(?K2, ?P2, 2, Q1),
+    Q3 = in(?K3, ?P3, 2, Q2),
+    ?assertEqual([?K3, ?K2], drain(Q3)).
+a_lot_of_elements_test() ->
+    N = 100000,
+    KVs = lists:map(
+        fun(I) ->
+            P = rand:uniform(100),
+            {{I, P}, P}
+        end,
+        lists:seq(1, N)
+    ),
+    Q = from_map("foo", N, maps:from_list(KVs)),
+    ?assertMatch([{size, N} | _], info(Q)),
+    {_, Priorities} = lists:unzip(drain(Q)),
+    ?assertEqual(lists:reverse(lists:sort(Priorities)), Priorities).
+drain(Q) ->
+    lists:reverse(drain(out(Q), [])).
+drain(false, Acc) ->
+    Acc;
+drain({Key, Q}, Acc) ->
+    drain(out(Q), [Key | Acc]).
diff --git a/src/smoosh/test/smoosh_priority_queue_tests.erl b/src/smoosh/test/smoosh_priority_queue_tests.erl
deleted file mode 100644
index 289804ca5..000000000
--- a/src/smoosh/test/smoosh_priority_queue_tests.erl
+++ /dev/null
@@ -1,167 +0,0 @@
--define(PROP_PREFIX, "prop_").
--define(CAPACITY, 3).
--define(RANDOM_CHANNEL, lists:flatten(io_lib:format("~p", [erlang:timestamp()]))).
-setup() ->
-    Ctx = test_util:start_couch(),
-    Ctx.
-teardown(Ctx) ->
-    test_util:stop_couch(Ctx).
-smoosh_priority_queue_test_() ->
-    {
-        "smoosh priority queue test",
-        {
-            setup,
-            fun setup/0,
-            fun teardown/1,
-            [
-                fun prop_inverse_test_/0,
-                fun no_halt_on_corrupted_file_test/0,
-                fun no_halt_on_missing_file_test/0
-            ]
-        }
-    }.
-%% ==========
-%% Tests
-%% ----------
-%% define all tests to be able to run them individually
-prop_inverse_test_() ->
-    ?_test(begin
-        test_property(prop_inverse)
-    end).
-no_halt_on_corrupted_file_test() ->
-    ?_test(begin
-        Name = ?RANDOM_CHANNEL,
-        Q = smoosh_priority_queue:new(Name),
-        FilePath = smoosh_priority_queue:file_name(Q),
-        ok = file:write_file(FilePath, <<"garbage">>),
-        ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
-        ok
-    end).
-no_halt_on_missing_file_test() ->
-    ?_test(begin
-        Name = ?RANDOM_CHANNEL,
-        Q = smoosh_priority_queue:new(Name),
-        FilePath = smoosh_priority_queue:file_name(Q),
-        ok = file:delete(FilePath),
-        ?assertEqual(Q, smoosh_priority_queue:recover(Q)),
-        ok
-    end).
-%% ==========
-%% Properties
-%% ----------
-prop_inverse() ->
-    ?FORALL(
-        Q,
-        queue(),
-        begin
-            List = smoosh_priority_queue:to_list(Q),
-            equal(Q, smoosh_priority_queue:from_list(List, Q))
-        end
-    ).
-%% ==========
-%% Generators
-%% ----------
-key() ->
-    proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
-value() ->
-    proper_types:oneof([proper_types:binary(), {proper_types:binary(), proper_types:binary()}]).
-priority() -> integer().
-item() -> {key(), value(), priority()}.
-items_list() ->
-    ?LET(L, list(item()), L).
-simple_queue() ->
-    ?LET(
-        L,
-        items_list(),
-        from_list(L)
-    ).
-with_deleted() ->
-    ?LET(
-        Q,
-        ?LET(
-            {{K0, V0, P0}, Q0},
-            {item(), simple_queue()},
-            smoosh_priority_queue:in(K0, V0, P0, ?CAPACITY, Q0)
-        ),
-        frequency([
-            {1, Q},
-            {2, element(3, smoosh_priority_queue:out(Q))}
-        ])
-    ).
-queue() ->
-    with_deleted().
-%% ==========================
-%% Proper related boilerplate
-%% --------------------------
-test_property(Property) when is_atom(Property) ->
-    test_property({atom_to_list(Property), Property});
-test_property({Id, Property}) ->
-    Name = string:sub_string(Id, length(?PROP_PREFIX) + 1),
-    Opts = [long_result, {numtests, 1000}, {to_file, user}],
-    {Name, {timeout, 60, fun() -> test_it(Property, Opts) end}}.
-test_it(Property, Opts) ->
-    case proper:quickcheck(?MODULE:Property(), Opts) of
-        true ->
-            true;
-        Else ->
-            erlang:error(
-                {propertyFailed, [
-                    {module, ?MODULE},
-                    {property, Property},
-                    {result, Else}
-                ]}
-            )
-    end.
-%% ================
-%% Helper functions
-%% ----------------
-new() ->
-    Q = smoosh_priority_queue:new("foo"),
-    smoosh_priority_queue:recover(Q).
-from_list(List) ->
-    lists:foldl(
-        fun({Key, Value, Priority}, Queue) ->
-            smoosh_priority_queue:in(Key, Value, Priority, ?CAPACITY, Queue)
-        end,
-        new(),
-        List
-    ).
-equal(Q1, Q2) ->
-    out_all(Q1) =:= out_all(Q2).
-out_all(Q) ->
-    out_all(Q, []).
-out_all(Q0, Acc) ->
-    case smoosh_priority_queue:out(Q0) of
-        {K, V, Q1} -> out_all(Q1, [{K, V} | Acc]);
-        false -> lists:reverse(Acc)
-    end.