You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2015/01/23 21:18:51 UTC
accumulo git commit: ACCUMULO-3480 ACCUMULO-3437 improved group
balancer and fixed a bug
Repository: accumulo
Updated Branches:
refs/heads/master 888263236 -> b0815affa
ACCUMULO-3480 ACCUMULO-3437 improved group balancer and fixed a bug
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b0815aff
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b0815aff
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b0815aff
Branch: refs/heads/master
Commit: b0815affade66ab04ca27b6fc3abaac400097469
Parents: 8882632
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jan 23 15:12:27 2015 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jan 23 15:13:47 2015 -0500
----------------------------------------------------------------------
.../server/master/balancer/GroupBalancer.java | 111 ++++++++++++++++---
.../master/balancer/GroupBalancerTest.java | 79 ++++++++++++-
2 files changed, 169 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0815aff/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
index 8feeb81..3ccf5ed 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/GroupBalancer.java
@@ -44,6 +44,7 @@ import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletMigration;
import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.io.Text;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -68,6 +69,7 @@ import com.google.common.collect.Table;
public abstract class GroupBalancer extends TabletBalancer {
private final String tableId;
+ private final Text textTableId;
private long lastRun = 0;
/**
@@ -77,6 +79,7 @@ public abstract class GroupBalancer extends TabletBalancer {
public GroupBalancer(String tableId) {
this.tableId = tableId;
+ this.textTableId = new Text(tableId);
}
protected Iterable<Pair<KeyExtent,Location>> getLocationProvider() {
@@ -90,6 +93,31 @@ public abstract class GroupBalancer extends TabletBalancer {
return 60000;
}
+ /**
+ * The maximum number of migrations to perform in a single pass.
+ */
+ protected int getMaxMigrations() {
+ return 1000;
+ }
+
+ /**
+ * @return Examine current tserver and migrations and return true if balancing should occur.
+ */
+ protected boolean shouldBalance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations) {
+
+ if (current.size() < 2) {
+ return false;
+ }
+
+ for (KeyExtent keyExtent : migrations) {
+ if (keyExtent.getTableId().equals(textTableId)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
Map<KeyExtent,TServerInstance> assignments) {
@@ -172,7 +200,7 @@ public abstract class GroupBalancer extends TabletBalancer {
// G2 | G2 | G2 | G2 | G2 | G2 <-- expected tablets for group 2
// G3 | G3 | G3 | G3 | G3 | G3 <-- expected tablets for group 3
- if (migrations.size() > 0 || current.size() < 2) {
+ if (!shouldBalance(current, migrations)) {
return 5000;
}
@@ -180,8 +208,6 @@ public abstract class GroupBalancer extends TabletBalancer {
return 5000;
}
- lastRun = System.currentTimeMillis();
-
MapCounter<String> groupCounts = new MapCounter<>();
Map<TServerInstance,TserverGroupInfo> tservers = new HashMap<>();
@@ -196,7 +222,7 @@ public abstract class GroupBalancer extends TabletBalancer {
String group = partitioner.apply(entry.getFirst());
Location loc = entry.getSecond();
- if (loc.equals(Location.NONE) || !current.containsKey(loc.getTserverInstance())) {
+ if (loc.equals(Location.NONE) || !tservers.containsKey(loc.getTserverInstance())) {
return 5000;
}
@@ -227,12 +253,21 @@ public abstract class GroupBalancer extends TabletBalancer {
Moves moves = new Moves();
+ // The order of the following steps is important, because as ordered each step should not move any tablets moved by a previous step.
balanceExpected(tservers, moves);
- balanceExtraExpected(tservers, expectedExtra, moves);
- balanceExtraMultiple(tservers, maxExtraGroups, moves);
- balanceExtraExtra(tservers, maxExtraGroups, moves);
+ if (moves.size() < getMaxMigrations()) {
+ balanceExtraExpected(tservers, expectedExtra, moves);
+ if (moves.size() < getMaxMigrations()) {
+ boolean cont = balanceExtraMultiple(tservers, maxExtraGroups, moves);
+ if (cont && moves.size() < getMaxMigrations()) {
+ balanceExtraExtra(tservers, maxExtraGroups, moves);
+ }
+ }
+ }
- populateMigrations(current, migrationsOut, moves);
+ populateMigrations(tservers.keySet(), migrationsOut, moves);
+
+ lastRun = System.currentTimeMillis();
return 5000;
}
@@ -266,7 +301,7 @@ public abstract class GroupBalancer extends TabletBalancer {
}
}
- private static class TserverGroupInfo {
+ static class TserverGroupInfo {
private Map<String,Integer> expectedCounts;
private final Map<String,MutableInt> initialCounts = new HashMap<>();
@@ -411,6 +446,7 @@ public abstract class GroupBalancer extends TabletBalancer {
private static class Moves {
private final Table<TServerInstance,String,List<Move>> moves = HashBasedTable.create();
+ private int totalMoves = 0;
public void move(String group, int num, TserverGroupInfo src, TserverGroupInfo dest) {
Preconditions.checkArgument(num > 0);
@@ -426,6 +462,7 @@ public abstract class GroupBalancer extends TabletBalancer {
}
srcMoves.add(new Move(dest, num));
+ totalMoves += num;
}
public TServerInstance removeMove(TServerInstance src, String group) {
@@ -436,6 +473,7 @@ public abstract class GroupBalancer extends TabletBalancer {
Move move = srcMoves.get(srcMoves.size() - 1);
TServerInstance ret = move.dest.getTserverInstance();
+ totalMoves--;
move.count--;
if (move.count == 0) {
@@ -447,6 +485,10 @@ public abstract class GroupBalancer extends TabletBalancer {
return ret;
}
+
+ public int size() {
+ return totalMoves;
+ }
}
private void balanceExtraExtra(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) {
@@ -484,21 +526,28 @@ public abstract class GroupBalancer extends TabletBalancer {
serversGroupsToRemove.add(new Pair<String,TServerInstance>(group, srcTgi.getTserverInstance()));
}
- if (destTgi.getExtras().size() >= maxExtraGroups) {
+ if (destTgi.getExtras().size() >= maxExtraGroups || moves.size() >= getMaxMigrations()) {
break;
}
}
}
- surplusExtra.columnKeySet().removeAll(serversToRemove);
+ if (serversToRemove.size() > 0) {
+ surplusExtra.columnKeySet().removeAll(serversToRemove);
+ }
+
for (Pair<String,TServerInstance> pair : serversGroupsToRemove) {
surplusExtra.remove(pair.getFirst(), pair.getSecond());
}
+
+ if (moves.size() >= getMaxMigrations()) {
+ break;
+ }
}
}
}
- private void balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) {
+ private boolean balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves) {
Multimap<String,TserverGroupInfo> extraMultiple = HashMultimap.create();
for (TserverGroupInfo tgi : tservers.values()) {
@@ -510,10 +559,23 @@ public abstract class GroupBalancer extends TabletBalancer {
}
}
+ balanceExtraMultiple(tservers, maxExtraGroups, moves, extraMultiple, false);
+ if (moves.size() < getMaxMigrations() && extraMultiple.size() > 0) {
+ // no place to move so must exceed maxExtra temporarily... subsequent balancer calls will smooth things out
+ balanceExtraMultiple(tservers, maxExtraGroups, moves, extraMultiple, true);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private void balanceExtraMultiple(Map<TServerInstance,TserverGroupInfo> tservers, int maxExtraGroups, Moves moves,
+ Multimap<String,TserverGroupInfo> extraMultiple, boolean alwaysAdd) {
+
ArrayList<Pair<String,TserverGroupInfo>> serversToRemove = new ArrayList<>();
for (TserverGroupInfo destTgi : tservers.values()) {
Map<String,Integer> extras = destTgi.getExtras();
- if (extras.size() < maxExtraGroups) {
+ if (alwaysAdd || extras.size() < maxExtraGroups) {
serversToRemove.clear();
for (String group : extraMultiple.keySet()) {
if (!extras.containsKey(group)) {
@@ -529,7 +591,7 @@ public abstract class GroupBalancer extends TabletBalancer {
serversToRemove.add(new Pair<String,TserverGroupInfo>(group, srcTgi));
}
- if (destTgi.getExtras().size() >= maxExtraGroups) {
+ if (destTgi.getExtras().size() >= maxExtraGroups || moves.size() >= getMaxMigrations()) {
break;
}
}
@@ -539,7 +601,7 @@ public abstract class GroupBalancer extends TabletBalancer {
extraMultiple.remove(pair.getFirst(), pair.getSecond());
}
- if (extraMultiple.size() == 0) {
+ if (extraMultiple.size() == 0 || moves.size() >= getMaxMigrations()) {
break;
}
}
@@ -591,7 +653,7 @@ public abstract class GroupBalancer extends TabletBalancer {
emptyServerGroups.add(new Pair<String,TServerInstance>(group, srcTgi.getTserverInstance()));
}
- if (destTgi.getExtras().size() >= expectedExtra) {
+ if (destTgi.getExtras().size() >= expectedExtra || moves.size() >= getMaxMigrations()) {
break;
}
}
@@ -605,6 +667,9 @@ public abstract class GroupBalancer extends TabletBalancer {
extraSurplus.remove(pair.getFirst(), pair.getSecond());
}
+ if (moves.size() >= getMaxMigrations()) {
+ break;
+ }
}
}
}
@@ -643,19 +708,26 @@ public abstract class GroupBalancer extends TabletBalancer {
numToMove -= transfer;
moves.move(group, transfer, surplusTsi, defecitTsi);
+ if (moves.size() >= getMaxMigrations()) {
+ return;
+ }
}
}
}
}
- private void populateMigrations(SortedMap<TServerInstance,TabletServerStatus> current, List<TabletMigration> migrationsOut, Moves moves) {
+ private void populateMigrations(Set<TServerInstance> current, List<TabletMigration> migrationsOut, Moves moves) {
+ if (moves.size() == 0) {
+ return;
+ }
+
Function<KeyExtent,String> partitioner = getPartitioner();
for (Pair<KeyExtent,Location> entry : getLocationProvider()) {
String group = partitioner.apply(entry.getFirst());
Location loc = entry.getSecond();
- if (loc.equals(Location.NONE) || !current.containsKey(loc.getTserverInstance())) {
+ if (loc.equals(Location.NONE) || !current.contains(loc.getTserverInstance())) {
migrationsOut.clear();
return;
}
@@ -663,6 +735,9 @@ public abstract class GroupBalancer extends TabletBalancer {
TServerInstance dest = moves.removeMove(loc.getTserverInstance(), group);
if (dest != null) {
migrationsOut.add(new TabletMigration(entry.getFirst(), loc.getTserverInstance(), dest));
+ if (moves.size() == 0) {
+ break;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b0815aff/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
index 6e31001..39e454d 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/GroupBalancerTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -71,6 +72,10 @@ public class GroupBalancerTest {
}
public void balance() {
+ balance(10000);
+ }
+
+ public void balance(final int maxMigrations) {
GroupBalancer balancer = new GroupBalancer("1") {
@Override
@@ -94,12 +99,17 @@ public class GroupBalancerTest {
protected long getWaitTime() {
return 0;
}
+
+ @Override
+ protected int getMaxMigrations() {
+ return maxMigrations;
+ }
};
- balance(balancer);
+ balance(balancer, maxMigrations);
}
- public void balance(TabletBalancer balancer) {
+ public void balance(TabletBalancer balancer, int maxMigrations) {
while (true) {
Set<KeyExtent> migrations = new HashSet<>();
@@ -112,6 +122,8 @@ public class GroupBalancerTest {
balancer.balance(current, migrations, migrationsOut);
+ Assert.assertTrue("Max Migration exceeded " + maxMigrations + " " + migrationsOut.size(), migrationsOut.size() <= (maxMigrations + 5));
+
for (TabletMigration tabletMigration : migrationsOut) {
Assert.assertEquals(tabletLocs.get(tabletMigration.tablet), tabletMigration.oldServer);
Assert.assertTrue(tservers.contains(tabletMigration.newServer));
@@ -163,7 +175,8 @@ public class GroupBalancerTest {
int tserverExtra = 0;
for (String group : groupCounts.keySet()) {
Assert.assertTrue(tgc.get(group) >= expectedCounts.get(group));
- Assert.assertTrue(tgc.get(group) <= expectedCounts.get(group) + 1);
+ Assert.assertTrue("Group counts not as expected group:" + group + " actual:" + tgc.get(group) + " expected:" + (expectedCounts.get(group) + 1)
+ + " tserver:" + entry.getKey(), tgc.get(group) <= expectedCounts.get(group) + 1);
tserverExtra += tgc.get(group) - expectedCounts.get(group);
}
@@ -282,4 +295,64 @@ public class GroupBalancerTest {
}
}
}
+
+ @Test
+ public void testMaxMigrations() {
+
+ for (int max : new int[] {1, 2, 3, 7, 10, 30}) {
+ TabletServers tservers = new TabletServers();
+
+ for (int i = 1; i <= 9; i++) {
+ tservers.addTablet("01" + i, "192.168.1.1:9997");
+ }
+
+ for (int i = 1; i <= 4; i++) {
+ tservers.addTablet("02" + i, "192.168.1.2:9997");
+ }
+
+ for (int i = 1; i <= 5; i++) {
+ tservers.addTablet("03" + i, "192.168.1.3:9997");
+ }
+
+ tservers.addTservers("192.168.1.4:9997", "192.168.1.5:9997");
+
+ tservers.balance(max);
+ }
+ }
+
+ @Test
+ public void bigTest() {
+ TabletServers tservers = new TabletServers();
+ Random rand = new Random(42);
+
+ for (int g = 1; g <= 60; g++) {
+ for (int t = 1; t <= 241; t++) {
+ tservers.addTablet(String.format("%02d:%d", g, t), "192.168.1." + (rand.nextInt(249) + 1) + ":9997");
+ }
+ }
+
+ for (int i = 1; i <= 250; i++) {
+ tservers.addTserver("192.168.1." + i + ":9997");
+ }
+
+ tservers.balance(1000);
+ }
+
+ @Test
+ public void bigTest2() {
+ TabletServers tservers = new TabletServers();
+ Random rand = new Random(42);
+
+ for (int g = 1; g <= 60; g++) {
+ for (int t = 1; t <= rand.nextInt(1000); t++) {
+ tservers.addTablet(String.format("%02d:%d", g, t), "192.168.1." + (rand.nextInt(249) + 1) + ":9997");
+ }
+ }
+
+ for (int i = 1; i <= 250; i++) {
+ tservers.addTserver("192.168.1." + i + ":9997");
+ }
+
+ tservers.balance(1000);
+ }
}