You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/27 23:09:59 UTC
svn commit: r1450993 - in /accumulo/branches/1.5:
core/src/main/java/org/apache/accumulo/core/conf/
server/src/main/java/org/apache/accumulo/server/master/
server/src/main/java/org/apache/accumulo/server/master/recovery/
server/src/main/java/org/apache...
Author: kturner
Date: Wed Feb 27 22:09:58 2013
New Revision: 1450993
URL: http://svn.apache.org/r1450993
Log:
ACCUMULO-1077 fixed deadlock w/ walog recovery and made walog recovery retry w/ backoff.... failed log recoveries were not retrying
Added:
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
Removed:
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRRecoverLease.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
Modified:
accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
Modified: accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/1.5/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Feb 27 22:09:58 2013
@@ -105,7 +105,7 @@ public enum Property {
MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
"When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
- MASTER_LEASE_RECOVERY_IMPLEMETATION("master.lease.recovery.implementation", "org.apache.accumulo.server.master.recovery.RecoverLease",
+ MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation", "org.apache.accumulo.server.master.recovery.HadoopLogCloser",
PropertyType.CLASSNAME, "A class that implements a mechansim to steal write access to a file"),
MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT,
"The number of threads used to run FAult-Tolerant Executions. These are primarily table operations like merge."),
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Feb 27 22:09:58 2013
@@ -102,7 +102,7 @@ import org.apache.accumulo.server.conf.S
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
import org.apache.accumulo.server.master.balancer.TabletBalancer;
-import org.apache.accumulo.server.master.recovery.RecoverLease;
+import org.apache.accumulo.server.master.recovery.RecoveryManager;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.CurrentState;
import org.apache.accumulo.server.master.state.DeadServerList;
@@ -158,7 +158,6 @@ import org.apache.accumulo.start.classlo
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -203,6 +202,7 @@ public class Master implements LiveTServ
final private SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
final private EventCoordinator nextEvent = new EventCoordinator();
final private Object mergeLock = new Object();
+ private RecoveryManager recoveryManager = null;
private ZooLock masterLock = null;
private TServer clientService = null;
@@ -1344,7 +1344,7 @@ public class Master implements LiveTServ
if (goal == TabletGoalState.HOSTED) {
if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
- if (recoverLogs(tls.extent, tls.walogs))
+ if (recoveryManager.recoverLogs(tls.extent, tls.walogs))
continue;
}
switch (state) {
@@ -2036,39 +2036,13 @@ public class Master implements LiveTServ
return result;
}
- public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
- boolean recoveryNeeded = false;
- for (Collection<String> logs : walogs) {
- for (String log : logs) {
- String parts[] = log.split("/");
- String host = parts[0];
- String filename = parts[1];
- if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
- recoveriesInProgress.remove(filename);
- continue;
- }
- recoveryNeeded = true;
- synchronized (recoveriesInProgress) {
- if (!recoveriesInProgress.contains(filename)) {
- Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
- AccumuloConfiguration aconf = getConfiguration().getConfiguration();
- RecoverLease impl = createInstanceFromPropertyName(aconf, Property.MASTER_LEASE_RECOVERY_IMPLEMETATION, RecoverLease.class, new RecoverLease());
- impl.init(host, filename);
- long tid = fate.startTransaction();
- fate.seedTransaction(tid, impl, true);
- recoveriesInProgress.add(filename);
- }
- }
- }
- }
- return recoveryNeeded;
- }
-
public void run() throws IOException, InterruptedException, KeeperException {
final String zroot = ZooUtil.getRoot(instance);
getMasterLock(zroot + Constants.ZMASTER_LOCK);
+ recoveryManager = new RecoveryManager(this);
+
TableManager.getInstance().addObserver(this);
StatusThread statusThread = new StatusThread();
Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,40 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.log4j.Logger;
+
+public class HadoopLogCloser implements LogCloser {
+
+ private static Logger log = Logger.getLogger(HadoopLogCloser.class);
+
+ @Override
+ public long close(FileSystem fs, Path source) throws IOException {
+
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ try {
+ if (!dfs.recoverLease(source)) {
+ log.info("Waiting for file to be closed " + source.toString());
+ return 1000;
+ }
+ log.info("Recovered lease on " + source.toString());
+ return 0;
+ } catch (IOException ex) {
+ log.warn("Error recovery lease on " + source.toString(), ex);
+ }
+ } else if (fs instanceof LocalFileSystem) {
+ // ignore
+ } else {
+ throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+ }
+ fs.append(source).close();
+ log.info("Recovered lease on " + source.toString() + " using append");
+ return 0;
+ }
+
+}
Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,10 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public interface LogCloser {
+ public long close(FileSystem fs, Path path) throws IOException;
+}
Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,28 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
+
+public class MapRLogCloser implements LogCloser {
+
+ private static Logger log = Logger.getLogger(MapRLogCloser.class);
+
+ @Override
+ public long close(FileSystem fs, Path path) throws IOException {
+ log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+ FsPermission roPerm = new FsPermission((short) 0444);
+ try {
+ fs.setPermission(path, roPerm);
+ return 0;
+ } catch (IOException ex) {
+ log.error("error recovering lease ", ex);
+ // lets do this again
+ return 1000;
+ }
+ }
+
+}
Added: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1450993&view=auto
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (added)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Wed Feb 27 22:09:58 2013
@@ -0,0 +1,166 @@
+package org.apache.accumulo.server.master.recovery;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+
+public class RecoveryManager {
+
+ private static Logger log = Logger.getLogger(RecoveryManager.class);
+
+ private Map<String,Long> recoveryDelay = new HashMap<String,Long>();
+ private Set<String> closeTasksQueued = new HashSet<String>();
+ private Set<String> sortsQueued = new HashSet<String>();
+ private ScheduledExecutorService executor;
+ private Master master;
+ private ZooCache zooCache;
+
+ public RecoveryManager(Master master) {
+ this.master = master;
+ executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter "));
+ zooCache = new ZooCache();
+ try {
+ List<String> workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued();
+ sortsQueued.addAll(workIDs);
+ } catch (Exception e) {
+ log.warn(e, e);
+ }
+ }
+
+ private class LogSortTask implements Runnable {
+ private String filename;
+ private String host;
+ private LogCloser closer;
+
+ public LogSortTask(LogCloser closer, String host, String filename) {
+ this.closer = closer;
+ this.host = host;
+ this.filename = filename;
+ }
+
+ @Override
+ public void run() {
+ boolean rescheduled = false;
+ try {
+ FileSystem localFs = master.getFileSystem();
+ if (localFs instanceof TraceFileSystem)
+ localFs = ((TraceFileSystem) localFs).getImplementation();
+
+ long time = closer.close(localFs, getSource(host, filename));
+
+ if (time > 0) {
+ executor.schedule(this, time, TimeUnit.MILLISECONDS);
+ rescheduled = true;
+ } else {
+ initiateSort(host, filename);
+ }
+
+ } catch (Exception e) {
+ log.warn("Failed to initiate log sort " + filename, e);
+ } finally {
+ if (!rescheduled) {
+ synchronized (RecoveryManager.this) {
+ closeTasksQueued.remove(filename);
+ }
+ }
+ }
+ }
+
+ }
+
+ private void initiateSort(String host, final String file) throws KeeperException, InterruptedException {
+ String source = getSource(host, file).toString();
+ new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+
+ synchronized (this) {
+ sortsQueued.add(file);
+ }
+
+ final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
+ log.info("Created zookeeper entry " + path + " with data " + source);
+ }
+
+ private Path getSource(String server, String file) {
+ String source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + server + "/" + file;
+ if (server.contains(":")) {
+ // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
+ source = Constants.getWalDirectory(master.getSystemConfiguration()) + "/" + file;
+ }
+ return new Path(source);
+ }
+
+ public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
+ boolean recoveryNeeded = false;
+ for (Collection<String> logs : walogs) {
+ for (String walog : logs) {
+ String parts[] = walog.split("/");
+ String host = parts[0];
+ String filename = parts[1];
+
+ boolean sortQueued;
+ synchronized (this) {
+ sortQueued = sortsQueued.contains(filename);
+ }
+
+ if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) {
+ synchronized (this) {
+ sortsQueued.remove(filename);
+ }
+ }
+
+ if (master.getFileSystem().exists(new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + filename + "/finished"))) {
+ synchronized (this) {
+ closeTasksQueued.remove(filename);
+ recoveryDelay.remove(filename);
+ sortsQueued.remove(filename);
+ }
+ continue;
+ }
+
+ recoveryNeeded = true;
+ synchronized (this) {
+ if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) {
+ AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
+ LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class,
+ new HadoopLogCloser());
+ Long delay = recoveryDelay.get(filename);
+ if (delay == null) {
+ delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
+ } else {
+ delay = Math.min(2 * delay, 1000 * 60 * 5l);
+ }
+
+ log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
+
+ executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS);
+ closeTasksQueued.add(filename);
+ recoveryDelay.put(filename, delay);
+ }
+ }
+ }
+ }
+ return recoveryNeeded;
+ }
+}
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Wed Feb 27 22:09:58 2013
@@ -104,7 +104,7 @@ public class LogSorter {
String formerThreadName = Thread.currentThread().getName();
int part = 0;
try {
-
+
// the following call does not throw an exception if the file/dir does not exist
fs.delete(new Path(destPath), true);
@@ -183,7 +183,7 @@ public class LogSorter {
Thread.currentThread().setName(formerThreadName);
try {
close();
- } catch (IOException e) {
+ } catch (Exception e) {
log.error("Error during cleanup sort/copy " + name, e);
}
synchronized (this) {
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1450993&r1=1450992&r2=1450993&view=diff
==============================================================================
--- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (original)
+++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Wed Feb 27 22:09:58 2013
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.zookeeper;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -106,6 +107,7 @@ public class DistributedWorkQueue {
log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
}
+ // TODO always delete this
try {
zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
} catch (Exception e) {
@@ -210,6 +212,12 @@ public class DistributedWorkQueue {
zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP);
}
+ public List<String> getWorkQueued() throws KeeperException, InterruptedException {
+ ArrayList<String> children = new ArrayList<String>(zoo.getChildren(path));
+ children.remove(LOCKS_NODE);
+ return children;
+ }
+
public void waitUntilDone(Set<String> workIDs) throws KeeperException, InterruptedException {
final String condVar = new String("cond");