You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/28 19:46:36 UTC

git commit: ACCUMULO-2261 remove unexpected future and loc entries for dead servers from the metadata tables

Updated Branches:
  refs/heads/1.6.0-SNAPSHOT 783e895fb -> f5c35b950


ACCUMULO-2261 remove unexpected future and loc entries for dead servers from the metadata tables


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f5c35b95
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f5c35b95
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f5c35b95

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: f5c35b9502ba107b8384a0760b1b9ba8e056ef85
Parents: 783e895
Author: Eric Newton <er...@gmail.com>
Authored: Tue Jan 28 13:46:31 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Tue Jan 28 13:46:37 2014 -0500

----------------------------------------------------------------------
 .../master/state/MetaDataTableScanner.java      |  11 +-
 .../master/state/TabletLocationState.java       |   8 +-
 .../accumulo/master/TabletGroupWatcher.java     |  63 +++++++-
 .../test/MasterRepairsDualAssignmentIT.java     | 146 +++++++++++++++++++
 4 files changed, 216 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5c35b95/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index c03b5b4..90a9d19 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -123,14 +123,7 @@ public class MetaDataTableScanner implements Iterator<TabletLocationState> {
   
   @Override
   public TabletLocationState next() {
-    try {
       return fetch();
-    } catch (RuntimeException ex) {
-      // something is wrong with the metadata records, just skip over it
-      log.error(ex, ex);
-      mdScanner.close();
-      return null;
-    }
   }
   
   public static TabletLocationState createTabletLocationState(Key k, Value v) throws IOException, BadLocationStateException {
@@ -152,13 +145,13 @@ public class MetaDataTableScanner implements Iterator<TabletLocationState> {
       if (cf.compareTo(TabletsSection.FutureLocationColumnFamily.NAME) == 0) {
         TServerInstance location = new TServerInstance(entry.getValue(), cq);
         if (future != null) {
-          throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location);
+          throw new BadLocationStateException("found two assignments for the same extent " + key.getRow() + ": " + future + " and " + location, entry.getKey().getRow());
         }
         future = location;
       } else if (cf.compareTo(TabletsSection.CurrentLocationColumnFamily.NAME) == 0) {
         TServerInstance location = new TServerInstance(entry.getValue(), cq);
         if (current != null) {
-          throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location);
+          throw new BadLocationStateException("found two locations for the same extent " + key.getRow() + ": " + current + " and " + location, entry.getKey().getRow());
         }
         current = location;
       } else if (cf.compareTo(LogColumnFamily.NAME) == 0) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5c35b95/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index bcfaead..5432d32 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.Set;
 
 import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.hadoop.io.Text;
 
 /**
  * When a tablet is assigned, we mark its future location. When the tablet is opened, we set its current location. A tablet should never have both a future and
@@ -33,8 +34,11 @@ public class TabletLocationState {
   
   static public class BadLocationStateException extends Exception {
     private static final long serialVersionUID = 1L;
+    private Text metadataTableEntry;
 
-    BadLocationStateException(String msg) { super(msg); }
+    BadLocationStateException(String msg, Text row) { super(msg); this.metadataTableEntry = row; }
+
+    public Text getEncodedEndRow() { return metadataTableEntry; }
   }
   
   public TabletLocationState(KeyExtent extent, TServerInstance future, TServerInstance current, TServerInstance last, Collection<Collection<String>> walogs,
@@ -48,7 +52,7 @@ public class TabletLocationState {
     this.walogs = walogs;
     this.chopped = chopped;
     if (current != null && future != null) {
-      throw new BadLocationStateException(extent + " is both assigned and hosted, which should never happen: " + this);
+      throw new BadLocationStateException(extent + " is both assigned and hosted, which should never happen: " + this, extent.getMetadataEntry());
     }
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5c35b95/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 8609105..1a201f5 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -50,7 +50,9 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.util.Daemon;
@@ -69,6 +71,7 @@ import org.apache.accumulo.server.master.state.MergeInfo;
 import org.apache.accumulo.server.master.state.MergeState;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
 import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.security.SystemCredentials;
@@ -78,6 +81,8 @@ import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
 
+import com.google.common.collect.Iterators;
+
 class TabletGroupWatcher extends Daemon {
   
   private final Master master;
@@ -276,11 +281,67 @@ class TabletGroupWatcher extends Daemon {
         eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
       } catch (Exception ex) {
         Master.log.error("Error processing table state for store " + store.name(), ex);
-        UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+        if (ex.getCause() != null && ex.getCause() instanceof BadLocationStateException) { 
+          repairMetadata(((BadLocationStateException) ex.getCause()).getEncodedEndRow());
+        } else {
+          UtilWaitThread.sleep(Master.WAIT_BETWEEN_ERRORS);
+        }
       }
     }
   }
   
+  private void repairMetadata(Text row) {
+    Master.log.debug("Attempting repair on " + row);
+    // ACCUMULO-2261 if a dying tserver writes a location before its lock information propagates, it may cause duplicate assignment.
+    // Attempt to find the dead server entry and remove it.
+    try {
+      Map<Key, Value> future = new HashMap<Key, Value>();
+      Map<Key, Value> assigned = new HashMap<Key, Value>();
+      KeyExtent extent = new KeyExtent(row, new Value(new byte[]{0}));
+      String table = MetadataTable.NAME;
+      if (extent.isMeta())
+        table = RootTable.NAME;
+      Scanner scanner = this.master.getConnector().createScanner(table, Authorizations.EMPTY);
+      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+      scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+      scanner.setRange(new Range(row));
+      for (Entry<Key,Value> entry : scanner) {
+        if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
+          assigned.put(entry.getKey(), entry.getValue());
+        } else if (entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) {
+          future.put(entry.getKey(), entry.getValue());
+        }
+      }
+      if (future.size() > 0 && assigned.size() > 0) {
+        Master.log.warn("Found a tablet assigned and hosted, attempting to repair");
+      } else if (future.size() > 1 && assigned.size() == 0) {
+        Master.log.warn("Found a tablet assigned to multiple servers, attempting to repair");
+      } else if (future.size() == 0 && assigned.size() > 1) {
+        Master.log.warn("Found a tablet hosted on multiple servers, attempting to repair");
+      } else {
+        Master.log.info("Attempted a repair, but nothing seems to be obviously wrong. " + assigned + " " + future);
+        return;
+      }
+      Iterator<Entry<Key, Value>> iter = Iterators.concat(future.entrySet().iterator(), assigned.entrySet().iterator());
+      while (iter.hasNext()) {
+        Entry<Key, Value> entry = iter.next();
+        TServerInstance alive = master.tserverSet.find(entry.getValue().toString());
+        if (alive == null) {
+          Master.log.info("Removing entry " + entry);
+          BatchWriter bw = this.master.getConnector().createBatchWriter(table, new BatchWriterConfig());
+          Mutation m = new Mutation(entry.getKey().getRow());
+          m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+          bw.addMutation(m);
+          bw.close();
+          return;
+        }
+      }
+      Master.log.error("Metadata table is inconsistent at " + row + " and all assigned/future tservers are still online.");
+    } catch (Throwable e) {
+      Master.log.error("Error attempting repair of metadata " + row + ": " + e, e);
+    }
+  }
+
   private int assignedOrHosted() {
     int result = 0;
     for (TableCounts counts : stats.getLast().values()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f5c35b95/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
new file mode 100644
index 0000000..afd1403
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.master.state.MetaDataStateStore;
+import org.apache.accumulo.server.master.state.RootTabletStateStore;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.master.state.TabletLocationState;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class MasterRepairsDualAssignmentIT extends ConfigurableMacIT {
+  
+  
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setProperty(Property.MASTER_RECOVERY_DELAY, "5s");
+  }
+
+  @Test(timeout = 5 * 60 * 1000)
+  public void test() throws Exception {
+    // make some tablets, spread 'em around
+    Connector c = getConnector();
+    Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD));
+    String table = this.getTableNames(1)[0];
+    c.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+    c.securityOperations().grantTablePermission("root", RootTable.NAME, TablePermission.WRITE);
+    c.tableOperations().create(table);
+    SortedSet<Text> partitions = new TreeSet<Text>();
+    for (String part : "a b c d e f g h i j k l m n o p q r s t u v w x y z".split(" ")) {
+      partitions.add(new Text(part));
+    }
+    c.tableOperations().addSplits(table, partitions);
+    // scan the metadata table and get the two table location states
+    Set<TServerInstance> states = new HashSet<TServerInstance>();
+    Set<TabletLocationState> oldLocations = new HashSet<TabletLocationState>();
+    MetaDataStateStore store = new MetaDataStateStore(c.getInstance(), creds, null);
+    while (states.size() < 2) {
+      UtilWaitThread.sleep(250);
+      oldLocations.clear();
+      for (TabletLocationState tls : store) {
+        if (tls.current != null) {
+          states.add(tls.current);
+          oldLocations.add(tls);
+        }
+      }
+    }
+    assertEquals(2, states.size());
+    // Kill a tablet server... we don't care which one... wait for everything to be reassigned
+    cluster.killProcess(ServerType.TABLET_SERVER, cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+    // Find out which tablet server remains
+    while (true) {
+      UtilWaitThread.sleep(1000);
+      states.clear();
+      boolean allAssigned = true;
+      for (TabletLocationState tls : store) {
+        if (tls != null && tls.current != null) {
+          states.add(tls.current);
+        } else {
+          allAssigned = false;
+        }
+      }
+      System.out.println(states + " size " + states.size() + " allAssigned " + allAssigned);
+      if (states.size() != 2 && allAssigned == true)
+        break;
+    } 
+    assertEquals(1, states.size());
+    // pick an assigned tablet and assign it to the old tablet
+    TabletLocationState moved = null;
+    for (TabletLocationState old : oldLocations) {
+      if (!states.contains(old.current)) {
+        moved = old;
+      }
+    }
+    assertNotEquals(null, moved);
+    // throw a mutation in as if we were the dying tablet
+    BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    Mutation assignment = new Mutation(moved.extent.getMetadataEntry());
+    assignment.put(CurrentLocationColumnFamily.NAME, moved.current.asColumnQualifier(), moved.current.asMutationValue());
+    bw.addMutation(assignment);
+    bw.close();
+    // wait for the master to fix the problem
+    waitForCleanStore(store);
+    // now jam up the metadata table
+    bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+    assignment = new Mutation(new KeyExtent(new Text(MetadataTable.ID), null, null).getMetadataEntry());
+    assignment.put(CurrentLocationColumnFamily.NAME, moved.current.asColumnQualifier(), moved.current.asMutationValue());
+    bw.addMutation(assignment);
+    bw.close();
+    waitForCleanStore(new RootTabletStateStore(c.getInstance(), creds, null));
+  }
+
+  private void waitForCleanStore(MetaDataStateStore store) {
+    while (true) {
+      try {
+        for (@SuppressWarnings("unused") TabletLocationState tls : store)
+          ;
+      } catch (Exception ex) {
+        System.out.println(ex);
+        UtilWaitThread.sleep(250);
+        continue;
+      }
+      break;
+    }
+  }
+
+}