You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/27 23:09:59 UTC

svn commit: r1450993 - in /accumulo/branches/1.5: core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/master/ server/src/main/java/org/apache/accumulo/server/master/recovery/ server/src/main/java/org/apache...

Author: kturner
Date: Wed Feb 27 22:09:58 2013
New Revision: 1450993

URL: http://svn.apache.org/r1450993
Log:
ACCUMULO-1077 fixed deadlock w/ walog recovery and made walog recovery retry w/ backoff.... failed log recoveries were not retrying

Added:
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
Removed:
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRRecoverLease.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
Modified:
    accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java

Modified: accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Feb 27 22:09:58 2013
@@ -105,7 +105,7 @@ public enum Property {
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
       "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
-  MASTER_LEASE_RECOVERY_IMPLEMETATION("master.lease.recovery.implementation", "org.apache.accumulo.server.master.recovery.RecoverLease",
+  MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation", "org.apache.accumulo.server.master.recovery.HadoopLogCloser",
       PropertyType.CLASSNAME, "A class that implements a mechansim to steal write access to a file"),
   MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT,
       "The number of threads used to run FAult-Tolerant Executions.  These are primarily table operations like merge."),

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Feb 27 22:09:58 2013
@@ -102,7 +102,7 @@ import org.apache.accumulo.server.conf.S
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
-import org.apache.accumulo.server.master.recovery.RecoverLease;
+import org.apache.accumulo.server.master.recovery.RecoveryManager;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.CurrentState;
 import org.apache.accumulo.server.master.state.DeadServerList;
@@ -158,7 +158,6 @@ import org.apache.accumulo.start.classlo
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -203,6 +202,7 @@ public class Master implements LiveTServ
   final private SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
   final private EventCoordinator nextEvent = new EventCoordinator();
   final private Object mergeLock = new Object();
+  private RecoveryManager recoveryManager = null;
   
   private ZooLock masterLock = null;
   private TServer clientService = null;
@@ -1344,7 +1344,7 @@ public class Master implements LiveTServ
             
             if (goal == TabletGoalState.HOSTED) {
               if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (recoverLogs(tls.extent, tls.walogs))
+                if (recoveryManager.recoverLogs(tls.extent, tls.walogs))
                   continue;
               }
               switch (state) {
@@ -2036,39 +2036,13 @@ public class Master implements LiveTServ
     return result;
   }
   
-  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
-    boolean recoveryNeeded = false;
-    for (Collection<String> logs : walogs) {
-      for (String log : logs) {
-        String parts[] = log.split("/");
-        String host = parts[0];
-        String filename = parts[1];
-        if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
-          recoveriesInProgress.remove(filename);
-          continue;
-        }
-        recoveryNeeded = true;
-        synchronized (recoveriesInProgress) {
-          if (!recoveriesInProgress.contains(filename)) {
-            Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
-            AccumuloConfiguration aconf = getConfiguration().getConfiguration();
-            RecoverLease impl = createInstanceFromPropertyName(aconf, Property.MASTER_LEASE_RECOVERY_IMPLEMETATION, RecoverLease.class, new RecoverLease());
-            impl.init(host, filename);
-            long tid = fate.startTransaction();
-            fate.seedTransaction(tid, impl, true);
-            recoveriesInProgress.add(filename);
-          }
-        }
-      }
-    }
-    return recoveryNeeded;
-  }
-  
   public void run() throws IOException, InterruptedException, KeeperException {
     final String zroot = ZooUtil.getRoot(instance);
     
     getMasterLock(zroot + Constants.ZMASTER_LOCK);
     
+    recoveryManager = new RecoveryManager(this);
+
     TableManager.getInstance().addObserver(this);
     
     StatusThread statusThread = new StatusThread();

Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,40 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.Logger;
+
+public class HadoopLogCloser implements LogCloser {
+  
+  private static Logger log = Logger.getLogger(HadoopLogCloser.class);
+
+  @Override
+  public long close(FileSystem fs, Path source) throws IOException {
+    
+    if (fs instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        if (!dfs.recoverLease(source)) {
+          log.info("Waiting for file to be closed " + source.toString());
+          return 1000;
+        }
+        log.info("Recovered lease on " + source.toString());
+        return 0;
+      } catch (IOException ex) {
+        log.warn("Error recovery lease on " + source.toString(), ex);
+      }
+    } else if (fs instanceof LocalFileSystem) {
+      // ignore
+    } else {
+      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+    }
+    fs.append(source).close();
+    log.info("Recovered lease on " + source.toString() + " using append");
+    return 0;
+  }
+  
+}

Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,10 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public interface LogCloser {
+  public long close(FileSystem fs, Path path) throws IOException;
+}

Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,28 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
+
+public class MapRLogCloser implements LogCloser {
+  
+  private static Logger log = Logger.getLogger(MapRLogCloser.class);
+  
+  @Override
+  public long close(FileSystem fs, Path path) throws IOException {
+    log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+    FsPermission roPerm = new FsPermission((short) 0444);
+    try {
+      fs.setPermission(path, roPerm);
+      return 0;
+    } catch (IOException ex) {
+      log.error("error recovering lease ", ex);
+      // lets do this again
+      return 1000;
+    }
+  }
+  
+}

Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,166 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class RecoveryManager {
+  
+  private static Logger log = Logger.getLogger(RecoveryManager.class);
+  
+  private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
+  private Set<String> closeTasksQueued = new HashSet<String>();
+  private Set<String> sortsQueued = new HashSet<String>();
+  private ScheduledExecutorService executor;
+  private Master master;
+  private ZooCache zooCache;
+  
+  public RecoveryManager(Master master) {
+    this.master = master;
+    executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
+    zooCache = new ZooCache();
+    try {
+      List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
+      sortsQueued.addAll(workIDs);
+    } catch (Exception e) {
+      log.warn(e, e);
+    }
+  }
+
+  private class LogSortTask implements Runnable {
+    private String filename;
+    private String host;
+    private LogCloser closer;
+    
+    public LogSortTask(LogCloser closer, String host, String filename) {
+      this.closer = closer;
+      this.host = host;
+      this.filename = filename;
+    }
+
+    @Override
+    public void run() {
+      boolean rescheduled = false;
+      try {
+        FileSystem localFs = master.getFileSystem();
+        if (localFs instanceof TraceFileSystem)
+          localFs = ((TraceFileSystem) localFs).getImplementation();
+      
+        long time = closer.close(localFs, getSource(host, filename));
+      
+        if (time > 0) {
+          executor.schedule(this, time, TimeUnit.MILLISECONDS);
+          rescheduled = true;
+        } else {
+          initiateSort(host, filename);
+        }
+        
+      } catch (Exception e) {
+        log.warn("Failed to initiate log sort " + filename, e);
+      } finally {
+        if (!rescheduled) {
+          synchronized (RecoveryManager.this) {
+            closeTasksQueued.remove(filename);
+          }
+        }
+      }
+    }
+    
+  }
+  
+  private void initiateSort(String host, final String file) throws KeeperException, InterruptedException {
+    String source = getSource(host, file).toString();
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+    
+    synchronized (this) {
+      sortsQueued.add(file);
+    }
+
+    final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
+    log.info("Created zookeeper entry " + path + " with data " + source);
+  }
+  
+  private Path getSource(String server, String file) {
+    String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file;
+    if (server.contains(":")) {
+      // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
+      source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file;
+    }
+    return new Path(source);
+  }
+
+  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
+    boolean recoveryNeeded = false;
+    for (Collection<String> logs : walogs) {
+      for (String walog : logs) {
+        String parts[] = walog.split("/");
+        String host = parts[0];
+        String filename = parts[1];
+        
+        boolean sortQueued;
+        synchronized (this) {
+          sortQueued = sortsQueued.contains(filename);
+        }
+        
+        if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) {
+          synchronized (this) {
+            sortsQueued.remove(filename);
+          }
+        }
+
+        if (master.getFileSystem().exists(new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + filename + "/finished"))) {
+          synchronized (this) {
+            closeTasksQueued.remove(filename);
+            recoveryDelay.remove(filename);
+            sortsQueued.remove(filename);
+          }
+          continue;
+        }
+        
+        recoveryNeeded = true;
+        synchronized (this) {
+          if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) {
+            AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
+            LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class,
+                new HadoopLogCloser());
+            Long delay = recoveryDelay.get(filename);
+            if (delay == null) {
+              delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
+            } else {
+              delay = Math.min(2 * delay, 1000 * 60 * 5l);
+            }
+
+            log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
+
+            executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS);
+            closeTasksQueued.add(filename);
+            recoveryDelay.put(filename, delay);
+          }
+        }
+      }
+    }
+    return recoveryNeeded;
+  }
+}

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Wed Feb 27 22:09:58 2013
@@ -104,7 +104,7 @@ public class LogSorter {
       String formerThreadName = Thread.currentThread().getName();
       int part = 0;
       try {
-        
+
         // the following call does not throw an exception if the file/dir does not exist
         fs.delete(new Path(destPath), true);
         
@@ -183,7 +183,7 @@ public class LogSorter {
         Thread.currentThread().setName(formerThreadName);
         try {
           close();
-        } catch (IOException e) {
+        } catch (Exception e) {
           log.error("Error during cleanup sort/copy " + name, e);
         }
         synchronized (this) {

Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Wed Feb 27 22:09:58 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.zookeeper;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -106,6 +107,7 @@ public class DistributedWorkQueue {
                   log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
                 }
                 
+                // TODO always delete this
                 try {
                   zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
                 } catch (Exception e) {
@@ -210,6 +212,12 @@ public class DistributedWorkQueue {
     zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP);
   }
   
+  public List<String> getWorkQueued() throws KeeperException, InterruptedException {
+    ArrayList<String> children = new ArrayList<String>(zoo.getChildren(path));
+    children.remove(LOCKS_NODE);
+    return children;
+  }
+
   public void waitUntilDone(Set<String> workIDs) throws KeeperException, InterruptedException {
     
     final String condVar = new String("cond");