You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:22:56 UTC

[36/50] mem3 commit: updated refs/heads/master to 64c0c74

Suggest moves from all donor nodes in parallel

Previously the generator would suggest all moves from the first node
before moving onto the second one.  In the case where the quantum of
jobs is much smaller than the number of moves per node this results in
the other donors being neglected for long periods.

BugzID: 24612


Project: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/commit/f9c06d62
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/tree/f9c06d62
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-mem3/diff/f9c06d62

Branch: refs/heads/master
Commit: f9c06d629525873f2130caac81992a0cb42ab01f
Parents: a6ca5c6
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 30 17:40:20 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Wed Jul 23 18:46:27 2014 +0100

----------------------------------------------------------------------
 src/mem3_rebalance.erl | 87 +++++++++++++++++++++++++--------------------
 1 file changed, 49 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-mem3/blob/f9c06d62/src/mem3_rebalance.erl
----------------------------------------------------------------------
diff --git a/src/mem3_rebalance.erl b/src/mem3_rebalance.erl
index 5589e45..f00201a 100644
--- a/src/mem3_rebalance.erl
+++ b/src/mem3_rebalance.erl
@@ -36,7 +36,7 @@
 -include("mem3.hrl").
 
 -record (gacc, {
-    node,
+    donors,
     targets,
     moves,
     limit,
@@ -136,36 +136,28 @@ global_expand(TargetNodes0, LocalOps, Limit) ->
     end, shard_count_by_node(LocalOps)),
     TotalCount = lists:foldl(fun({_, C}, Sum) -> Sum + C end, 0, CountByNode),
     TargetLevel = TotalCount div length(TargetNodes),
-    FoldFun = fun
-        (_, Acc) when length(Acc) >= Limit ->
-            % We've already accumulated the max number of shard ops.
-            Acc;
-        ({_Node, Count}, Acc) when Count =< TargetLevel ->
-            % This node is not a donor.
-            Acc;
-        ({Node0, Count}, Acc) ->
-            Node = list_to_existing_atom(binary_to_list(Node0)),
-            InternalAcc0 = #gacc{
-                node = Node,
-                targets = TargetNodes0,
-                moves = Acc,
-                limit = erlang:min(Count - TargetLevel, Limit - length(Acc)),
-                target_level = TargetLevel
-            },
-            try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
-                #gacc{moves = Moves} ->
-                    Moves
-            catch
-                {complete, Moves} ->
-                    Moves
-            end
-    end,
-    lists:foldl(FoldFun, LocalOps, CountByNode).
+    Donors = [{list_to_existing_atom(binary_to_list(N)), C - TargetLevel} ||
+        {N, C} <- CountByNode, C > TargetLevel],
+    InternalAcc0 = #gacc{
+        donors = orddict:from_list(Donors),
+        targets = TargetNodes0,
+        moves = LocalOps,
+        limit = Limit - length(LocalOps),
+        target_level = TargetLevel
+    },
+    try mem3_shards:fold(fun donate_fold/2, InternalAcc0) of
+        #gacc{moves = Moves} ->
+            Moves
+    catch
+        {complete, Moves} ->
+            Moves
+    end.
 
 donate_fold(_Shard, #gacc{limit = 0, moves = Moves}) ->
     throw({complete, Moves});
-donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
+donate_fold(#shard{node = Node} = Shard, Acc0) ->
      #gacc{
+        donors = Donors,
         targets = Nodes,
         moves = Moves,
         limit = DC,
@@ -202,21 +194,40 @@ donate_fold(#shard{node = Node} = Shard, #gacc{node = Node} = Acc0) ->
                 false
         end
     end,
-    case {lists:member(Shard, Shards), lists:dropwhile(Fun, SortedByCount)} of
-        {false, _} ->
-            Acc0;
-        {true, []} ->
-            Acc0;
-        {true, [{Target, _} | _]} ->
-            print({move, Shard, Target}),
-            Acc0#gacc{
-                moves = [{move, Shard, Target} | Moves],
-                limit = DC - 1
-            }
+    case {lists:member(Shard, Shards), lists:keymember(Node, 1, Donors)} of
+        {true, true} ->
+            case lists:dropwhile(Fun, SortedByCount) of
+                [{Target, _} | _] ->
+                    NewMoves = [{move, Shard, Target} | Moves],
+                    print({move, Shard, Target}),
+                    Acc0#gacc{
+                        moves = NewMoves,
+                        limit = DC - 1,
+                        donors = update_donors(Node, Donors, NewMoves)
+                    };
+                [] ->
+                    Acc0
+            end;
+        _ ->
+            Acc0
     end;
 donate_fold(_Shard, Acc) ->
     Acc.
 
+update_donors(Node, Donors, Moves) ->
+    NewDonors = case orddict:fetch(Node, Donors) of
+        1 ->
+            orddict:erase(Node, Donors);
+        X ->
+            orddict:store(Node, X-1, Donors)
+    end,
+    case orddict:size(NewDonors) of
+        0 ->
+            throw({complete, Moves});
+        _ ->
+            NewDonors
+    end.
+
 get_shard_count(AtomKey, ShardsByNode) when is_atom(AtomKey) ->
     length(couch_util:get_value(AtomKey, ShardsByNode, [])).