You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/06/05 13:49:29 UTC

svn commit: r1346359 - in /accumulo/branches/ACCUMULO-578: ./ core/ core/src/main/java/org/apache/accumulo/core/conf/ core/src/main/java/org/apache/accumulo/core/util/ server/ server/src/main/java/org/apache/accumulo/server/master/ server/src/main/java...

Author: ecn
Date: Tue Jun  5 11:49:28 2012
New Revision: 1346359

URL: http://svn.apache.org/viewvc?rev=1346359&view=rev
Log:
ACCUMULO-578 delay recovery to avoid tserver last-gasp when its lock is deleted, avoid repeating recovery operations

Added:
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java   (with props)
Modified:
    accumulo/branches/ACCUMULO-578/   (props changed)
    accumulo/branches/ACCUMULO-578/core/   (props changed)
    accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/ACCUMULO-578/server/   (props changed)
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/branches/ACCUMULO-578/src/   (props changed)

Propchange: accumulo/branches/ACCUMULO-578/
------------------------------------------------------------------------------
  Merged /accumulo/trunk:r1344413-1345120

Propchange: accumulo/branches/ACCUMULO-578/core/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/core:r1344413-1345120

Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java Tue Jun  5 11:49:28 2012
@@ -64,6 +64,8 @@ public enum Property {
   MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
   MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
+  MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
+      "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
   
   // properties that are specific to tablet server behavior
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"),

Added: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java?rev=1346359&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java (added)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java Tue Jun  5 11:49:28 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * 
+ */
+public class SimpleThreadPool extends ThreadPoolExecutor {
+  
+  public SimpleThreadPool(int max, final String name) {
+    super(0, Integer.MAX_VALUE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name));
+  }
+  
+}

Propchange: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: accumulo/branches/ACCUMULO-578/server/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/server:r1344413-1345120

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java Tue Jun  5 11:49:28 2012
@@ -217,6 +217,8 @@ public class Master implements LiveTServ
   volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
       .unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
   
+  private Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>());
+
   synchronized private MasterState getMasterState() {
     return state;
   }
@@ -1350,7 +1352,7 @@ public class Master implements LiveTServ
             
             if (goal == TabletGoalState.HOSTED) {
               if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
-                if (recoverLogs(tls.walogs))
+                if (recoverLogs(tls.extent, tls.walogs))
                   continue;
               }
               switch (state) {
@@ -2001,7 +2003,7 @@ public class Master implements LiveTServ
         result.put(server, status);
         // TODO maybe remove from bad servers
       } catch (Exception ex) {
-        log.error("unable to get tablet server status " + server + " " + ex.getMessage());
+        log.error("unable to get tablet server status " + server + " " + ex.toString());
         log.debug("unable to get tablet server status " + server, ex);
         if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
           log.warn("attempting to stop " + server);
@@ -2026,7 +2028,7 @@ public class Master implements LiveTServ
     return result;
   }
   
-  public boolean recoverLogs(Collection<Collection<String>> walogs) throws IOException {
+  public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
     boolean recoveryNeeded = false;
     for (Collection<String> logs : walogs) {
       for (String log : logs) {
@@ -2034,17 +2036,17 @@ public class Master implements LiveTServ
         String host = parts[0];
         String filename = parts[1];
         if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
+          recoveriesInProgress.remove(filename);
           continue;
         }
         recoveryNeeded = true;
-        String zPath = ZooUtil.getRoot(instance) + Constants.ZRECOVERY + "/" + filename;
-        try {
-          if (!ZooReaderWriter.getInstance().exists(zPath)) {
+        synchronized (recoveriesInProgress) {
+          if (!recoveriesInProgress.contains(filename)) {
+            Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
             long tid = fate.startTransaction();
             fate.seedTransaction(tid, new RecoverLease(host, filename), true);
+            recoveriesInProgress.add(filename);
           }
-        } catch (Exception ex) {
-          Master.log.info("Unable to check the recovery entry " + filename + ", will retry");
         }
       }
     }
@@ -2291,4 +2293,7 @@ public class Master implements LiveTServ
     return this.fs;
   }
 
+  public void updateRecoveryInProgress(String file) {
+    recoveriesInProgress.add(file);
+  }
 }

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java Tue Jun  5 11:49:28 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.maste
 import java.io.IOException;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.server.fate.Repo;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.tableOps.MasterRepo;
@@ -34,10 +35,12 @@ public class RecoverLease extends Master
 
   private String server;
   private String file;
+  private long start;
 
   public RecoverLease(String server, String file) {
     this.server = server;
     this.file = file;
+    this.start = System.currentTimeMillis();
   }
   
   public static Path getSource(Master master, String server, String file) {
@@ -55,6 +58,10 @@ public class RecoverLease extends Master
 
   @Override
   public long isReady(long tid, Master master) throws Exception {
+    master.updateRecoveryInProgress(file);
+    long diff = System.currentTimeMillis() - start;
+    if (diff < master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY))
+      return Math.max(diff, 0);
     FileSystem fs = master.getFileSystem();
     if (fs.exists(getSource(master)))
       return 0;

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Tue Jun  5 11:49:28 2012
@@ -46,6 +46,7 @@ public class SubmitFileForRecovery exten
 
   @Override
   public Repo<Master> call(long tid, final Master master) throws Exception {
+    master.updateRecoveryInProgress(file);
     String source = RecoverLease.getSource(master, server, file).toString();
     ZooReaderWriter zoo = ZooReaderWriter.getInstance();
     final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Tue Jun  5 11:49:28 2012
@@ -2055,11 +2055,9 @@ public class TabletServer extends Abstra
       nextFile:
       for (String filename : filenames) {
         for (String logger : loggers) {
-          log.debug("logger " + logger + " filename " + filename);
           if (logger.contains(filename))
             continue nextFile;
         }
-        // this check is not strictly necessary
         List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
         synchronized (onlineTablets) {
           onlineTabletsCopy.addAll(onlineTablets.values());
@@ -2067,21 +2065,27 @@ public class TabletServer extends Abstra
         for (Tablet tablet : onlineTabletsCopy) {
           for (String current : tablet.getCurrentLogs()) {
             if (current.contains(filename)) {
-              log.error("Attempting to remove a write-ahead log that is in use.  This should never happen!");
               log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
               continue nextFile;
             }
           }
         }
         try {
+          String source = logDir + "/" + filename;
           if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
-            log.info("Archiving walog " + filename);
-            fs.rename(new Path(logDir, filename), new Path(Constants.getBaseDir(acuConf) + "/walogArchive", filename));
+            String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+            fs.mkdirs(new Path(walogArchive));
+            String dest = walogArchive + "/" + filename;
+            log.info("Archiving walog " + source + " to " + dest);
+            if (!fs.rename(new Path(source), new Path(dest)))
+              log.error("rename is unsuccessful");
           } else {
             log.info("Deleting walog " + filename);
-            fs.delete(new Path(logDir, filename), true);
-            log.info("Deleting any recovery version of the log ");
-            fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true);
+            if (!fs.delete(new Path(source), true))
+              log.warn("Failed to delete walog " + source);
+            if (fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true))
+              log.info("Deleted any recovery log " + filename);
+
           }
         } catch (IOException e) {
           log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);

Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Tue Jun  5 11:49:28 2012
@@ -260,6 +260,7 @@ public class LogSorter {
             log.debug("got the lock, but thread pool is busy; released the lock on " + child);
             continue;
           }
+          log.info("got lock for " + child);
           byte[] contents = zoo.getData(childPath, null);
           String destination = Constants.getRecoveryDir(conf) + "/" + child;
           startSort(new String(contents), destination, new LogSortNotifier() {

Propchange: accumulo/branches/ACCUMULO-578/src/
------------------------------------------------------------------------------
  Merged /accumulo/trunk/src:r1344413-1345120