You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/02/19 21:51:22 UTC

accumulo git commit: ACCUMULO-3601 transfer migration information to the metadata table filtering iterator

Repository: accumulo
Updated Branches:
  refs/heads/1.6 7a570bd88 -> 4936b37f6


ACCUMULO-3601 transfer migration information to the metadata table filtering iterator


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4936b37f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4936b37f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4936b37f

Branch: refs/heads/1.6
Commit: 4936b37f6dc5bee2ba6c6b41b8f12c7e02a493f2
Parents: 7a570bd
Author: Eric Newton <Eric Newton>
Authored: Thu Feb 19 15:38:48 2015 -0500
Committer: Eric Newton <Eric Newton>
Committed: Thu Feb 19 15:38:48 2015 -0500

----------------------------------------------------------------------
 .../server/master/state/CurrentState.java       |  3 +
 .../master/state/MetaDataTableScanner.java      |  2 +
 .../master/state/TabletStateChangeIterator.java | 40 +++++++++
 .../java/org/apache/accumulo/master/Master.java |  5 ++
 .../apache/accumulo/master/TestMergeState.java  |  5 ++
 .../org/apache/accumulo/test/BalanceIT.java     | 87 ++++++++++++++++++++
 .../functional/TabletStateChangeIteratorIT.java |  5 ++
 7 files changed, 147 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
index 501d66a..b07a931 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java
@@ -19,6 +19,8 @@ package org.apache.accumulo.server.master.state;
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.accumulo.core.data.KeyExtent;
+
 public interface CurrentState {
 
   Set<String> onlineTables();
@@ -27,4 +29,5 @@ public interface CurrentState {
 
   Collection<MergeInfo> merges();
 
+  Collection<KeyExtent> migrations();
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index 025f082..8adce32 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -86,6 +86,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
       TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
       TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables());
       TabletStateChangeIterator.setMerges(tabletChange, state.merges());
+      TabletStateChangeIterator.setMigrations(tabletChange, state.migrations());
     }
     scanner.addScanIterator(tabletChange);
   }
@@ -98,6 +99,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
     this(instance, credentials, range, null, tableName);
   }
 
+  @Override
   public void close() {
     if (iter != null) {
       mdScanner.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
index 048884c..133013c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateChangeIterator.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -50,12 +51,14 @@ public class TabletStateChangeIterator extends SkippingIterator {
   private static final String TABLES_OPTION = "tables";
   private static final String MERGES_OPTION = "merges";
   private static final String DEBUG_OPTION = "debug";
+  private static final String MIGRATIONS_OPTION = "migrations";
   private static final Logger log = Logger.getLogger(TabletStateChangeIterator.class);
 
   Set<TServerInstance> current;
   Set<String> onlineTables;
   Map<Text,MergeInfo> merges;
   boolean debug = false;
+  Set<KeyExtent> migrations;
 
   @Override
   public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
@@ -64,6 +67,26 @@ public class TabletStateChangeIterator extends SkippingIterator {
     onlineTables = parseTables(options.get(TABLES_OPTION));
     merges = parseMerges(options.get(MERGES_OPTION));
     debug = options.containsKey(DEBUG_OPTION);
+    migrations = parseMigrations(options.get(MIGRATIONS_OPTION));
+  }
+
+  private Set<KeyExtent> parseMigrations(String migrations) {
+    if (migrations == null)
+      return Collections.emptySet();
+    try {
+      Set<KeyExtent> result = new HashSet<KeyExtent>();
+      DataInputBuffer buffer = new DataInputBuffer();
+      byte[] data = Base64.decodeBase64(migrations.getBytes(UTF_8));
+      buffer.reset(data, data.length);
+      while (buffer.available() > 0) {
+        KeyExtent extent = new KeyExtent();
+        extent.readFields(buffer);
+        result.add(extent);
+      }
+      return result;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
   }
 
   private Set<String> parseTables(String tables) {
@@ -136,6 +159,10 @@ public class TabletStateChangeIterator extends SkippingIterator {
         // could make this smarter by only returning if the tablet is involved in the merge
         return;
       }
+      // always return the informatin for migrating tablets
+      if (migrations.contains(tls.extent)) {
+        return;
+      }
 
       // is the table supposed to be online or offline?
       boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString());
@@ -200,4 +227,17 @@ public class TabletStateChangeIterator extends SkippingIterator {
     cfg.addOption(MERGES_OPTION, encoded);
   }
 
+  public static void setMigrations(IteratorSetting cfg, Collection<KeyExtent> migrations) {
+    DataOutputBuffer buffer = new DataOutputBuffer();
+    try {
+      for (KeyExtent  extent : migrations) {
+        extent.write(buffer);
+      }
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    String encoded = Base64.encodeBase64String(Arrays.copyOf(buffer.getData(), buffer.getLength()));
+    cfg.addOption(MIGRATIONS_OPTION, encoded);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 2989608..bc552bc 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1325,4 +1325,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
       }
     }
   }
+
+  @Override
+  public Collection<KeyExtent> migrations() {
+    return migrations.keySet();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
index eddbe15..b84df2b 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java
@@ -80,6 +80,11 @@ public class TestMergeState {
     public Collection<MergeInfo> merges() {
       return Collections.singleton(mergeInfo);
     }
+
+    @Override
+    public Collection<KeyExtent> migrations() {
+      return Collections.emptyList();
+    }
   }
 
   private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
new file mode 100644
index 0000000..7c8e452
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class BalanceIT extends ConfigurableMacIT {
+
+  @Test(timeout = 60 * 1000)
+  public void testBalance() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    Connector c = getConnector();
+    System.out.println("Creating table");
+    c.tableOperations().create(tableName);
+    SortedSet<Text> splits = new TreeSet<Text>();
+    for (int i = 0; i < 10; i++) {
+      splits.add(new Text("" + i));
+    }
+    System.out.println("Adding splits");
+    c.tableOperations().addSplits(tableName, splits);
+    System.out.println("Waiting for balance");
+    waitForBalance(c);
+  }
+
+  private void waitForBalance(Connector c) throws Exception {
+    while (!isBalanced(c)) {
+      UtilWaitThread.sleep(1000);
+    }
+  }
+
+  private boolean isBalanced(Connector c) throws Exception {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+    Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    scanner.setRange(MetadataSchema.TabletsSection.getRange());
+    scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME);
+    for (Entry<Key,Value> entry : scanner) {
+      String host = entry.getKey().getColumnQualifier().toString();
+      Integer count = counts.get(host);
+      if (count == null) {
+        count = new Integer(0);
+      }
+      counts.put(host, count.intValue() + 1);
+    }
+    if (counts.size() < 2) {
+      return false;
+    }
+    Iterator<Integer> iter = counts.values().iterator();
+    int initial = iter.next().intValue();
+    while (iter.hasNext()) {
+      if (Math.abs(iter.next().intValue() - initial) > 2)
+        return false;
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
index ffd7636..3adf6c0 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java
@@ -165,6 +165,11 @@ public class TabletStateChangeIteratorIT extends SharedMiniClusterIT {
     public Collection<MergeInfo> merges() {
       return Collections.emptySet();
     }
+
+    @Override
+    public Collection<KeyExtent> migrations() {
+      return Collections.emptyList();
+    }
   }
 
 }