You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/01/23 21:18:51 UTC

accumulo git commit: ACCUMULO-3480 ACCUMULO-3437 improved group balancer and fixed a bug

Repository: accumulo
Updated Branches:
  refs/heads/master 888263236 -> b0815affa


ACCUMULO-3480 ACCUMULO-3437 improved group balancer and fixed a bug


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b0815aff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b0815aff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b0815aff

Branch: refs/heads/master
Commit: b0815affade66ab04ca27b6fc3abaac400097469
Parents: 8882632
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jan 23 15:12:27 2015 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jan 23 15:13:47 2015 -0500

----------------------------------------------------------------------
 .../server/master/balancer/GroupBalancer.java   | 111 ++++++++++++++++---
 .../master/balancer/GroupBalancerTest.java      |  79 ++++++++++++-
 2 files changed, 169 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0815aff/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
index 8feeb81..3ccf5ed 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletMigration;
 import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.io.Text;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -68,6 +69,7 @@ import com.google.common.collect.Table;
 public abstract class GroupBalancer extends TabletBalancer {
 
   private final String tableId;
+  private final Text textTableId;
   private long lastRun = 0;
 
   /**
@@ -77,6 +79,7 @@ public abstract class GroupBalancer extends TabletBalancer {
 
   public GroupBalancer(String tableId) {
     this.tableId = tableId;
+    this.textTableId = new Text(tableId);
   }
 
   protected Iterable<Pair<KeyExtent,Location>> getLocationProvider() {
@@ -90,6 +93,31 @@ public abstract class GroupBalancer extends TabletBalancer {
     return 60000;
   }
 
+  /**
+   * The maximum number of migrations to perform in a single pass.
+   */
+  protected int getMaxMigrations() {
+    return 1000;
+  }
+
+  /**
+   * @return Examine current tserver and migrations and return true if balancing should occur.
+   */
+  protected boolean shouldBalance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations) {
+
+    if (current.size() < 2) {
+      return false;
+    }
+
+    for (KeyExtent keyExtent : migrations) {
+      if (keyExtent.getTableId().equals(textTableId)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
   @Override
   public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
       Map<KeyExtent,TServerInstance> assignments) {
@@ -172,7 +200,7 @@ public abstract class GroupBalancer extends TabletBalancer {
     // G2 | G2 | G2 | G2 | G2 | G2 <-- expected tablets for group 2
     // G3 | G3 | G3 | G3 | G3 | G3 <-- expected tablets for group 3
 
-    if (migrations.size() > 0 || current.size() < 2) {
+    if (!shouldBalance(current, migrations)) {
       return 5000;
     }
 
@@ -180,8 +208,6 @@ public abstract class GroupBalancer extends TabletBalancer {
       return 5000;
     }
 
-    lastRun = System.currentTimeMillis();
-
     MapCounter<String> groupCounts = new MapCounter<>();
     Map<TServerInstance,TserverGroupInfo> tservers = new HashMap<>();
 
@@ -196,7 +222,7 @@ public abstract class GroupBalancer extends TabletBalancer {
       String group = partitioner.apply(entry.getFirst());
       Location loc = entry.getSecond();
 
-      if (loc.equals(Location.NONE) || !current.containsKey(loc.getTserverInstance())) {
+      if (loc.equals(Location.NONE) || !tservers.containsKey(loc.getTserverInstance())) {
         return 5000;
       }
 
@@ -227,12 +253,21 @@ public abstract class GroupBalancer extends TabletBalancer {
 
     Moves moves = new Moves();
 
+    // The order of the following steps is important, because as ordered each step should not move any tablets moved by a previous step.
     balanceExpected(tservers, moves);
-    balanceExtraExpected(tservers, expectedExtra, moves);
-    balanceExtraMultiple(tservers, maxExtraGroups, moves);
-    balanceExtraExtra(tservers, maxExtraGroups, moves);
+    if (moves.size() < getMaxMigrations()) {
+      balanceExtraExpected(tservers, expectedExtra, moves);
+      if (moves.size() < getMaxMigrations()) {
+        boolean cont = balanceExtraMultiple(tservers, maxExtraGroups, moves);
+        if (cont && moves.size() < getMaxMigrations()) {
+          balanceExtraExtra(tservers, maxExtraGroups, moves);
+        }
+      }
+    }
 
-    populateMigrations(current, migrationsOut, moves);
+    populateMigrations(tservers.keySet(), migrationsOut, moves);
+
+    lastRun = System.currentTimeMillis();
 
     return 5000;
   }
@@ -266,7 +301,7 @@ public abstract class GroupBalancer extends TabletBalancer {
     }
   }
 
-  private static class TserverGroupInfo {
+  static class TserverGroupInfo {
 
     private Map<String,Integer> expectedCounts;
     private final Map<String,MutableInt> initialCounts = new HashMap<>();
@@ -411,6 +446,7 @@ public abstract class GroupBalancer extends TabletBalancer {
   private static class Moves {
 
     private final Table<TServerInstance,String,List<Move>> moves = HashBasedTable.create();
+    private int totalMoves = 0;
 
     public void move(String group, int num, TserverGroupInfo src, TserverGroupInfo dest) {
       Preconditions.checkArgument(num > 0);
@@ -426,6 +462,7 @@ public abstract class GroupBalancer extends TabletBalancer {
       }
 
       srcMoves.add(new Move(dest, num));
+      totalMoves += num;
     }
 
     public TServerInstance removeMove(TServerInstance src, String group) {
@@ -436,6 +473,7 @@ public abstract class GroupBalancer extends TabletBalancer {
 
       Move move = srcMoves.get(srcMoves.size() - 1);
       TServerInstance ret = move.dest.getTserverInstance();
+      totalMoves--;
 
       move.count--;
       if (move.count == 0) {
@@ -447,6 +485,10 @@ public abstract class GroupBalancer extends TabletBalancer {
 
       return ret;
     }
+
+    public int size() {
+      return totalMoves;
+    }
   }
 
   private void balanceExtraExtra(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) {
@@ -484,21 +526,28 @@ public abstract class GroupBalancer extends TabletBalancer {
               serversGroupsToRemove.add(new Pair<String,TServerInstance>(group, srcTgi.getTserverInstance()));
             }
 
-            if (destTgi.getExtras().size() >= maxExtraGroups) {
+            if (destTgi.getExtras().size() >= maxExtraGroups || moves.size() >= getMaxMigrations()) {
               break;
             }
           }
         }
 
-        surplusExtra.columnKeySet().removeAll(serversToRemove);
+        if (serversToRemove.size() > 0) {
+          surplusExtra.columnKeySet().removeAll(serversToRemove);
+        }
+
         for (Pair<String,TServerInstance> pair : serversGroupsToRemove) {
           surplusExtra.remove(pair.getFirst(), pair.getSecond());
         }
+
+        if (moves.size() >= getMaxMigrations()) {
+          break;
+        }
       }
     }
   }
 
-  private void balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) {
+  private boolean balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) {
     Multimap<String,TserverGroupInfo> extraMultiple = HashMultimap.create();
 
     for (TserverGroupInfo tgi : tservers.values()) {
@@ -510,10 +559,23 @@ public abstract class GroupBalancer extends TabletBalancer {
       }
     }
 
+    balanceExtraMultiple(tservers, maxExtraGroups, moves, extraMultiple, false);
+    if (moves.size() < getMaxMigrations() && extraMultiple.size() > 0) {
+      // no place to move so must exceed maxExtra temporarily... subsequent balancer calls will smooth things out
+      balanceExtraMultiple(tservers, maxExtraGroups, moves, extraMultiple, true);
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  private void balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves,
+      Multimap<String,TserverGroupInfo> extraMultiple, boolean alwaysAdd) {
+
     ArrayList<Pair<String,TserverGroupInfo>> serversToRemove = new ArrayList<>();
     for (TserverGroupInfo destTgi : tservers.values()) {
       Map<String,Integer> extras = destTgi.getExtras();
-      if (extras.size() < maxExtraGroups) {
+      if (alwaysAdd || extras.size() < maxExtraGroups) {
         serversToRemove.clear();
         for (String group : extraMultiple.keySet()) {
           if (!extras.containsKey(group)) {
@@ -529,7 +591,7 @@ public abstract class GroupBalancer extends TabletBalancer {
               serversToRemove.add(new Pair<String,TserverGroupInfo>(group, srcTgi));
             }
 
-            if (destTgi.getExtras().size() >= maxExtraGroups) {
+            if (destTgi.getExtras().size() >= maxExtraGroups || moves.size() >= getMaxMigrations()) {
               break;
             }
           }
@@ -539,7 +601,7 @@ public abstract class GroupBalancer extends TabletBalancer {
           extraMultiple.remove(pair.getFirst(), pair.getSecond());
         }
 
-        if (extraMultiple.size() == 0) {
+        if (extraMultiple.size() == 0 || moves.size() >= getMaxMigrations()) {
           break;
         }
       }
@@ -591,7 +653,7 @@ public abstract class GroupBalancer extends TabletBalancer {
               emptyServerGroups.add(new Pair<String,TServerInstance>(group, srcTgi.getTserverInstance()));
             }
 
-            if (destTgi.getExtras().size() >= expectedExtra) {
+            if (destTgi.getExtras().size() >= expectedExtra || moves.size() >= getMaxMigrations()) {
               break;
             }
           }
@@ -605,6 +667,9 @@ public abstract class GroupBalancer extends TabletBalancer {
           extraSurplus.remove(pair.getFirst(), pair.getSecond());
         }
 
+        if (moves.size() >= getMaxMigrations()) {
+          break;
+        }
       }
     }
   }
@@ -643,19 +708,26 @@ public abstract class GroupBalancer extends TabletBalancer {
           numToMove -= transfer;
 
           moves.move(group, transfer, surplusTsi, defecitTsi);
+          if (moves.size() >= getMaxMigrations()) {
+            return;
+          }
         }
       }
     }
   }
 
-  private void populateMigrations(SortedMap<TServerInstance,TabletServerStatus> current, List<TabletMigration> migrationsOut, Moves moves) {
+  private void populateMigrations(Set<TServerInstance> current, List<TabletMigration> migrationsOut, Moves moves) {
+    if (moves.size() == 0) {
+      return;
+    }
+
     Function<KeyExtent,String> partitioner = getPartitioner();
 
     for (Pair<KeyExtent,Location> entry : getLocationProvider()) {
       String group = partitioner.apply(entry.getFirst());
       Location loc = entry.getSecond();
 
-      if (loc.equals(Location.NONE) || !current.containsKey(loc.getTserverInstance())) {
+      if (loc.equals(Location.NONE) || !current.contains(loc.getTserverInstance())) {
         migrationsOut.clear();
         return;
       }
@@ -663,6 +735,9 @@ public abstract class GroupBalancer extends TabletBalancer {
       TServerInstance dest = moves.removeMove(loc.getTserverInstance(), group);
       if (dest != null) {
         migrationsOut.add(new TabletMigration(entry.getFirst(), loc.getTserverInstance(), dest));
+        if (moves.size() == 0) {
+          break;
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0815aff/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
index 6e31001..39e454d 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -71,6 +72,10 @@ public class GroupBalancerTest {
     }
 
     public void balance() {
+      balance(10000);
+    }
+
+    public void balance(final int maxMigrations) {
       GroupBalancer balancer = new GroupBalancer("1") {
 
         @Override
@@ -94,12 +99,17 @@ public class GroupBalancerTest {
         protected long getWaitTime() {
           return 0;
         }
+
+        @Override
+        protected int getMaxMigrations() {
+          return maxMigrations;
+        }
       };
 
-      balance(balancer);
+      balance(balancer, maxMigrations);
     }
 
-    public void balance(TabletBalancer balancer) {
+    public void balance(TabletBalancer balancer, int maxMigrations) {
 
       while (true) {
         Set<KeyExtent> migrations = new HashSet<>();
@@ -112,6 +122,8 @@ public class GroupBalancerTest {
 
         balancer.balance(current, migrations, migrationsOut);
 
+        Assert.assertTrue("Max Migration exceeded " + maxMigrations + " " + migrationsOut.size(), migrationsOut.size() <= (maxMigrations + 5));
+
         for (TabletMigration tabletMigration : migrationsOut) {
           Assert.assertEquals(tabletLocs.get(tabletMigration.tablet), tabletMigration.oldServer);
           Assert.assertTrue(tservers.contains(tabletMigration.newServer));
@@ -163,7 +175,8 @@ public class GroupBalancerTest {
         int tserverExtra = 0;
         for (String group : groupCounts.keySet()) {
           Assert.assertTrue(tgc.get(group) >= expectedCounts.get(group));
-          Assert.assertTrue(tgc.get(group) <= expectedCounts.get(group) + 1);
+          Assert.assertTrue("Group counts not as expected group:" + group + " actual:" + tgc.get(group) + " expected:" + (expectedCounts.get(group) + 1)
+              + " tserver:" + entry.getKey(), tgc.get(group) <= expectedCounts.get(group) + 1);
           tserverExtra += tgc.get(group) - expectedCounts.get(group);
         }
 
@@ -282,4 +295,64 @@ public class GroupBalancerTest {
       }
     }
   }
+
+  @Test
+  public void testMaxMigrations() {
+
+    for (int max : new int[] {1, 2, 3, 7, 10, 30}) {
+      TabletServers tservers = new TabletServers();
+
+      for (int i = 1; i <= 9; i++) {
+        tservers.addTablet("01" + i, "192.168.1.1:9997");
+      }
+
+      for (int i = 1; i <= 4; i++) {
+        tservers.addTablet("02" + i, "192.168.1.2:9997");
+      }
+
+      for (int i = 1; i <= 5; i++) {
+        tservers.addTablet("03" + i, "192.168.1.3:9997");
+      }
+
+      tservers.addTservers("192.168.1.4:9997", "192.168.1.5:9997");
+
+      tservers.balance(max);
+    }
+  }
+
+  @Test
+  public void bigTest() {
+    TabletServers tservers = new TabletServers();
+    Random rand = new Random(42);
+
+    for (int g = 1; g <= 60; g++) {
+      for (int t = 1; t <= 241; t++) {
+        tservers.addTablet(String.format("%02d:%d", g, t), "192.168.1." + (rand.nextInt(249) + 1) + ":9997");
+      }
+    }
+
+    for (int i = 1; i <= 250; i++) {
+      tservers.addTserver("192.168.1." + i + ":9997");
+    }
+
+    tservers.balance(1000);
+  }
+
+  @Test
+  public void bigTest2() {
+    TabletServers tservers = new TabletServers();
+    Random rand = new Random(42);
+
+    for (int g = 1; g <= 60; g++) {
+      for (int t = 1; t <= rand.nextInt(1000); t++) {
+        tservers.addTablet(String.format("%02d:%d", g, t), "192.168.1." + (rand.nextInt(249) + 1) + ":9997");
+      }
+    }
+
+    for (int i = 1; i <= 250; i++) {
+      tservers.addTserver("192.168.1." + i + ":9997");
+    }
+
+    tservers.balance(1000);
+  }
 }