You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/02/19 21:51:22 UTC
accumulo git commit: ACCUMULO-3601 transfer migration information to
the metadata table filtering iterator
Repository: accumulo
Updated Branches:
refs/heads/1.6 7a570bd88 -> 4936b37f6
ACCUMULO-3601 transfer migration information to the metadata table filtering iterator
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4936b37f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4936b37f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4936b37f
Branch: refs/heads/1.6
Commit: 4936b37f6dc5bee2ba6c6b41b8f12c7e02a493f2
Parents: 7a570bd
Author: Eric Newton <Eric Newton>
Authored: Thu Feb 19 15:38:48 2015 -0500
Committer: Eric Newton <Eric Newton>
Committed: Thu Feb 19 15:38:48 2015 -0500
----------------------------------------------------------------------
.../server/master/state/CurrentState.java | 3 +
.../master/state/MetaDataTableScanner.java | 2 +
.../master/state/TabletStateChangeIterator.java | 40 +++++++++
.../java/org/apache/accumulo/master/Master.java | 5 ++
.../apache/accumulo/master/TestMergeState.java | 5 ++
.../org/apache/accumulo/test/BalanceIT.java | 87 ++++++++++++++++++++
.../functional/TabletStateChangeIteratorIT.java | 5 ++
7 files changed, 147 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
index 501d66a..b07a931 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.server.master.state;
import java.util.Collection;
import java.util.Set;
+import org.apache.accumulo.core.data.KeyExtent;
+
public interface CurrentState {
Set<String> onlineTables();
@@ -27,4 +29,5 @@ public interface CurrentState {
Collection<MergeInfo> merges();
+ Collection<KeyExtent> migrations();
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index 025f082..8adce32 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -86,6 +86,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
TabletStateChangeIterator.setMerges(tabletChange, state.merges());
+ TabletStateChangeIterator.setMigrations(tabletChange, state.migrations());
}
scanner.addScanIterator(tabletChange);
}
@@ -98,6 +99,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
this(instance, credentials, range, null, tableName);
}
+ @Override
public void close() {
if (iter != null) {
mdScanner.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
index 048884c..133013c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,12 +51,14 @@ public class TabletStateChangeIterator extends SkippingIterator {
private static final String TABLES_OPTION = "tables";
private static final String MERGES_OPTION = "merges";
private static final String DEBUG_OPTION = "debug";
+ private static final String MIGRATIONS_OPTION = "migrations";
private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class);
Set<TServerInstance> current;
Set<String> onlineTables;
Map<Text,MergeInfo> merges;
boolean debug = false;
+ Set<KeyExtent> migrations;
@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
@@ -64,6 +67,26 @@ public class TabletStateChangeIterator extends SkippingIterator {
onlineTables = parseTables(options.get(TABLES_OPTION));
merges = parseMerges(options.get(MERGES_OPTION));
debug = options.containsKey(DEBUG_OPTION);
+ migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+ }
+
+ private Set<KeyExtent> parseMigrations(String migrations) {
+ if (migrations == null)
+ return Collections.emptySet();
+ try {
+ Set<KeyExtent> result = new HashSet<KeyExtent>();
+ DataInputBuffer buffer = new DataInputBuffer();
+ byte[] data = Base64.decodeBase64(migrations.getBytes(UTF_8));
+ buffer.reset(data, data.length);
+ while (buffer.available() > 0) {
+ KeyExtent extent = new KeyExtent();
+ extent.readFields(buffer);
+ result.add(extent);
+ }
+ return result;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
private Set<String> parseTables(String tables) {
@@ -136,6 +159,10 @@ public class TabletStateChangeIterator extends SkippingIterator {
// could make this smarter by only returning if the tablet is involved in the merge
return;
}
+ // always return the informatin for migrating tablets
+ if (migrations.contains(tls.extent)) {
+ return;
+ }
// is the table supposed to be online or offline?
boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString());
@@ -200,4 +227,17 @@ public class TabletStateChangeIterator extends SkippingIterator {
cfg.addOption(MERGES_OPTION, encoded);
}
+ public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> migrations) {
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ try {
+ for (KeyExtent extent : migrations) {
+ extent.write(buffer);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ String encoded = Base64.encodeBase64String(Arrays.copyOf(buffer.getData(), buffer.getLength()));
+ cfg.addOption(MIGRATIONS_OPTION, encoded);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 2989608..bc552bc 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1325,4 +1325,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
}
}
}
+
+ @Override
+ public Collection<KeyExtent> migrations() {
+ return migrations.keySet();
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index eddbe15..b84df2b 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -80,6 +80,11 @@ public class TestMergeState {
public Collection<MergeInfo> merges() {
return Collections.singleton(mergeInfo);
}
+
+ @Override
+ public Collection<KeyExtent> migrations() {
+ return Collections.emptyList();
+ }
}
private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
new file mode 100644
index 0000000..7c8e452
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BalanceIT extends ConfigurableMacIT {
+
+ @Test(timeout = 60 * 1000)
+ public void testBalance() throws Exception {
+ String tableName = getUniqueNames(1)[0];
+ Connector c = getConnector();
+ System.out.println("Creating table");
+ c.tableOperations().create(tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ for (int i = 0; i < 10; i++) {
+ splits.add(new Text("" + i));
+ }
+ System.out.println("Adding splits");
+ c.tableOperations().addSplits(tableName, splits);
+ System.out.println("Waiting for balance");
+ waitForBalance(c);
+ }
+
+ private void waitForBalance(Connector c) throws Exception {
+ while (!isBalanced(c)) {
+ UtilWaitThread.sleep(1000);
+ }
+ }
+
+ private boolean isBalanced(Connector c) throws Exception {
+ Map<String, Integer> counts = new HashMap<String, Integer>();
+ Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(MetadataSchema.TabletsSection.getRange());
+ scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+ for (Entry<Key,Value> entry : scanner) {
+ String host = entry.getKey().getColumnQualifier().toString();
+ Integer count = counts.get(host);
+ if (count == null) {
+ count = new Integer(0);
+ }
+ counts.put(host, count.intValue() + 1);
+ }
+ if (counts.size() < 2) {
+ return false;
+ }
+ Iterator<Integer> iter = counts.values().iterator();
+ int initial = iter.next().intValue();
+ while (iter.hasNext()) {
+ if (Math.abs(iter.next().intValue() - initial) > 2)
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
index ffd7636..3adf6c0 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
@@ -165,6 +165,11 @@ public class TabletStateChangeIteratorIT extends SharedMiniClusterIT {
public Collection<MergeInfo> merges() {
return Collections.emptySet();
}
+
+ @Override
+ public Collection<KeyExtent> migrations() {
+ return Collections.emptyList();
+ }
}
}