You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/27 23:13:09 UTC

svn commit: r1450994 - in /accumulo/trunk: ./ assemble/ core/ core/src/main/java/org/apache/accumulo/core/conf/ examples/ fate/src/main/java/org/apache/accumulo/fate/ fate/src/main/java/org/apache/accumulo/fate/zookeeper/ server/ server/src/main/java/o...

Author: kturner
Date: Wed Feb 27 22:13:09 2013
New Revision: 1450994

URL: http://svn.apache.org/r1450994
Log:
ACCUMULO-1077 fixed deadlock w/ walog recovery and made walog recovery retry w/ backoff.... failed log recoveries were not retrying

Added:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
      - copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
      - copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
      - copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
      - copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
Removed:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRRecoverLease.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
Modified:
    accumulo/trunk/   (props changed)
    accumulo/trunk/assemble/   (props changed)
    accumulo/trunk/core/   (props changed)
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/trunk/examples/   (props changed)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java   (props changed)
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java   (props changed)
    accumulo/trunk/server/   (props changed)
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
    accumulo/trunk/src/   (props changed)

Propchange: accumulo/trunk/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5:r1450993

Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/assemble:r1450993

Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/core:r1450993

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Feb 27 22:13:09 2013
@@ -105,7 +105,7 @@ public enum Property {
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
       "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
-  MASTER_LEASE_RECOVERY_IMPLEMETATION("master.lease.recovery.implementation", "org.apache.accumulo.server.master.recovery.RecoverLease",
+  MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation", "org.apache.accumulo.server.master.recovery.HadoopLogCloser",
       PropertyType.CLASSNAME, "A class that implements a mechansim to steal write access to a file"),
   MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT,
       "The number of threads used to run FAult-Tolerant Executions.  These are primarily table operations like merge."),

Propchange: accumulo/trunk/examples/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/examples:r1450993

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1450993

Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1450993

Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/server:r1450993

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Feb 27 22:13:09 2013
@@ -102,7 +102,7 @@ import org.apache.accumulo.server.conf.S
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
-import org.apache.accumulo.server.master.recovery.RecoverLease;
+import org.apache.accumulo.server.master.recovery.RecoveryManager;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.CurrentState;
 import org.apache.accumulo.server.master.state.DeadServerList;
@@ -158,7 +158,6 @@ import org.apache.accumulo.start.classlo
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -203,6 +202,7 @@ public class Master implements LiveTServ
   final private SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
   final private EventCoordinator nextEvent = new EventCoordinator();
   final private Object mergeLock = new Object();
+  private RecoveryManager recoveryManager = null;
   
   private ZooLock masterLock = null;
   private TServer clientService = null;
@@ -1344,7 +1344,7 @@ public class Master implements LiveTServ
             
             if (goal == TabletGoalState.HOSTED) {
               if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (recoverLogs(tls.extent, tls.walogs))
+                if (recoveryManager.recoverLogs(tls.extent, tls.walogs))
                   continue;
               }
               switch (state) {
@@ -2036,39 +2036,13 @@ public class Master implements LiveTServ
     return result;
   }
   
-  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
-    boolean recoveryNeeded = false;
-    for (Collection<String> logs : walogs) {
-      for (String log : logs) {
-        String parts[] = log.split("/");
-        String host = parts[0];
-        String filename = parts[1];
-        if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
-          recoveriesInProgress.remove(filename);
-          continue;
-        }
-        recoveryNeeded = true;
-        synchronized (recoveriesInProgress) {
-          if (!recoveriesInProgress.contains(filename)) {
-            Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
-            AccumuloConfiguration aconf = getConfiguration().getConfiguration();
-            RecoverLease impl = createInstanceFromPropertyName(aconf, Property.MASTER_LEASE_RECOVERY_IMPLEMETATION, RecoverLease.class, new RecoverLease());
-            impl.init(host, filename);
-            long tid = fate.startTransaction();
-            fate.seedTransaction(tid, impl, true);
-            recoveriesInProgress.add(filename);
-          }
-        }
-      }
-    }
-    return recoveryNeeded;
-  }
-  
   public void run() throws IOException, InterruptedException, KeeperException {
     final String zroot = ZooUtil.getRoot(instance);
     
     getMasterLock(zroot + Constants.ZMASTER_LOCK);
     
+    recoveryManager = new RecoveryManager(this);
+
     TableManager.getInstance().addObserver(this);
     
     StatusThread statusThread = new StatusThread();

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Wed Feb 27 22:13:09 2013
@@ -104,7 +104,7 @@ public class LogSorter {
       String formerThreadName = Thread.currentThread().getName();
       int part = 0;
       try {
-        
+
         // the following call does not throw an exception if the file/dir does not exist
         fs.delete(new Path(destPath), true);
         
@@ -183,7 +183,7 @@ public class LogSorter {
         Thread.currentThread().setName(formerThreadName);
         try {
           close();
-        } catch (IOException e) {
+        } catch (Exception e) {
           log.error("Error during cleanup sort/copy " + name, e);
         }
         synchronized (this) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Wed Feb 27 22:13:09 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.server.zookeeper;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -106,6 +107,7 @@ public class DistributedWorkQueue {
                   log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
                 }
                 
+                // TODO always delete this
                 try {
                   zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
                 } catch (Exception e) {
@@ -210,6 +212,12 @@ public class DistributedWorkQueue {
     zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP);
   }
   
+  public List<String> getWorkQueued() throws KeeperException, InterruptedException {
+    ArrayList<String> children = new ArrayList<String>(zoo.getChildren(path));
+    children.remove(LOCKS_NODE);
+    return children;
+  }
+
   public void waitUntilDone(Set<String> workIDs) throws KeeperException, InterruptedException {
     
     final String condVar = new String("cond");

Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.5/src:r1450993