You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/27 23:13:09 UTC
svn commit: r1450994 - in /accumulo/trunk: ./ assemble/ core/
core/src/main/java/org/apache/accumulo/core/conf/ examples/
fate/src/main/java/org/apache/accumulo/fate/
fate/src/main/java/org/apache/accumulo/fate/zookeeper/ server/
server/src/main/java/o...
Author: kturner
Date: Wed Feb 27 22:13:09 2013
New Revision: 1450994
URL: http://svn.apache.org/r1450994
Log:
ACCUMULO-1077 fixed deadlock w/ walog recovery and made walog recovery retry w/ backoff.... failed log recoveries were not retrying
Added:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
- copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
- copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
- copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
- copied unchanged from r1450993, accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
Removed:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRRecoverLease.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
Modified:
accumulo/trunk/ (props changed)
accumulo/trunk/assemble/ (props changed)
accumulo/trunk/core/ (props changed)
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
accumulo/trunk/examples/ (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed)
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed)
accumulo/trunk/server/ (props changed)
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
accumulo/trunk/src/ (props changed)
Propchange: accumulo/trunk/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5:r1450993
Propchange: accumulo/trunk/assemble/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/assemble:r1450993
Propchange: accumulo/trunk/core/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/core:r1450993
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Wed Feb 27 22:13:09 2013
@@ -105,7 +105,7 @@ public enum Property {
MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
"When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
- MASTER_LEASE_RECOVERY_IMPLEMETATION("master.lease.recovery.implementation", "org.apache.accumulo.server.master.recovery.RecoverLease",
+ MASTER_WALOG_CLOSER_IMPLEMETATION("master.walog.closer.implementation", "org.apache.accumulo.server.master.recovery.HadoopLogCloser",
PropertyType.CLASSNAME, "A class that implements a mechansim to steal write access to a file"),
MASTER_FATE_THREADPOOL_SIZE("master.fate.threadpool.size", "4", PropertyType.COUNT,
"The number of threads used to run FAult-Tolerant Executions. These are primarily table operations like merge."),
Propchange: accumulo/trunk/examples/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/examples:r1450993
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1450993
Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1450993
Propchange: accumulo/trunk/server/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/server:r1450993
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Feb 27 22:13:09 2013
@@ -102,7 +102,7 @@ import org.apache.accumulo.server.conf.S
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
import org.apache.accumulo.server.master.balancer.TabletBalancer;
-import org.apache.accumulo.server.master.recovery.RecoverLease;
+import org.apache.accumulo.server.master.recovery.RecoveryManager;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.CurrentState;
import org.apache.accumulo.server.master.state.DeadServerList;
@@ -158,7 +158,6 @@ import org.apache.accumulo.start.classlo
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -203,6 +202,7 @@ public class Master implements LiveTServ
final private SortedMap<KeyExtent,TServerInstance> migrations = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,TServerInstance>());
final private EventCoordinator nextEvent = new EventCoordinator();
final private Object mergeLock = new Object();
+ private RecoveryManager recoveryManager = null;
private ZooLock masterLock = null;
private TServer clientService = null;
@@ -1344,7 +1344,7 @@ public class Master implements LiveTServ
if (goal == TabletGoalState.HOSTED) {
if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
- if (recoverLogs(tls.extent, tls.walogs))
+ if (recoveryManager.recoverLogs(tls.extent, tls.walogs))
continue;
}
switch (state) {
@@ -2036,39 +2036,13 @@ public class Master implements LiveTServ
return result;
}
- public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
- boolean recoveryNeeded = false;
- for (Collection<String> logs : walogs) {
- for (String log : logs) {
- String parts[] = log.split("/");
- String host = parts[0];
- String filename = parts[1];
- if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
- recoveriesInProgress.remove(filename);
- continue;
- }
- recoveryNeeded = true;
- synchronized (recoveriesInProgress) {
- if (!recoveriesInProgress.contains(filename)) {
- Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
- AccumuloConfiguration aconf = getConfiguration().getConfiguration();
- RecoverLease impl = createInstanceFromPropertyName(aconf, Property.MASTER_LEASE_RECOVERY_IMPLEMETATION, RecoverLease.class, new RecoverLease());
- impl.init(host, filename);
- long tid = fate.startTransaction();
- fate.seedTransaction(tid, impl, true);
- recoveriesInProgress.add(filename);
- }
- }
- }
- }
- return recoveryNeeded;
- }
-
public void run() throws IOException, InterruptedException, KeeperException {
final String zroot = ZooUtil.getRoot(instance);
getMasterLock(zroot + Constants.ZMASTER_LOCK);
+ recoveryManager = new RecoveryManager(this);
+
TableManager.getInstance().addObserver(this);
StatusThread statusThread = new StatusThread();
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Wed Feb 27 22:13:09 2013
@@ -104,7 +104,7 @@ public class LogSorter {
String formerThreadName = Thread.currentThread().getName();
int part = 0;
try {
-
+
// the following call does not throw an exception if the file/dir does not exist
fs.delete(new Path(destPath), true);
@@ -183,7 +183,7 @@ public class LogSorter {
Thread.currentThread().setName(formerThreadName);
try {
close();
- } catch (IOException e) {
+ } catch (Exception e) {
log.error("Error during cleanup sort/copy " + name, e);
}
synchronized (this) {
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1450994&r1=1450993&r2=1450994&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Wed Feb 27 22:13:09 2013
@@ -16,6 +16,7 @@
*/
package org.apache.accumulo.server.zookeeper;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -106,6 +107,7 @@ public class DistributedWorkQueue {
log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
}
+ // TODO always delete this
try {
zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
} catch (Exception e) {
@@ -210,6 +212,12 @@ public class DistributedWorkQueue {
zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP);
}
+ public List<String> getWorkQueued() throws KeeperException, InterruptedException {
+ ArrayList<String> children = new ArrayList<String>(zoo.getChildren(path));
+ children.remove(LOCKS_NODE);
+ return children;
+ }
+
public void waitUntilDone(Set<String> workIDs) throws KeeperException, InterruptedException {
final String condVar = new String("cond");
Propchange: accumulo/trunk/src/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/src:r1450993