You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2012/06/05 13:49:29 UTC
svn commit: r1346359 - in /accumulo/branches/ACCUMULO-578: ./ core/
core/src/main/java/org/apache/accumulo/core/conf/
core/src/main/java/org/apache/accumulo/core/util/ server/
server/src/main/java/org/apache/accumulo/server/master/
server/src/main/java...
Author: ecn
Date: Tue Jun 5 11:49:28 2012
New Revision: 1346359
URL: http://svn.apache.org/viewvc?rev=1346359&view=rev
Log:
ACCUMULO-578 delay recovery to avoid tserver last-gasp when its lock is deleted, avoid repeating recovery operations
Added:
accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java (with props)
Modified:
accumulo/branches/ACCUMULO-578/ (props changed)
accumulo/branches/ACCUMULO-578/core/ (props changed)
accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java
accumulo/branches/ACCUMULO-578/server/ (props changed)
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
accumulo/branches/ACCUMULO-578/src/ (props changed)
Propchange: accumulo/branches/ACCUMULO-578/
------------------------------------------------------------------------------
Merged /accumulo/trunk:r1344413-1345120
Propchange: accumulo/branches/ACCUMULO-578/core/
------------------------------------------------------------------------------
Merged /accumulo/trunk/core:r1344413-1345120
Modified: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/conf/Property.java Tue Jun 5 11:49:28 2012
@@ -64,6 +64,8 @@ public enum Property {
MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk-import."),
MASTER_MINTHREADS("master.server.threads.minimum", "2", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."),
MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."),
+ MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
+ "When a tablet server's lock is deleted, it takes time for it to completely quit. This delay gives it time before log recoveries begin."),
// properties that are specific to tablet server behavior
TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"),
Added: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java?rev=1346359&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java (added)
+++ accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java Tue Jun 5 11:49:28 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ *
+ */
+public class SimpleThreadPool extends ThreadPoolExecutor {
+
+ public SimpleThreadPool(int max, final String name) {
+ super(0, Integer.MAX_VALUE, 1l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamingThreadFactory(name));
+ }
+
+}
Propchange: accumulo/branches/ACCUMULO-578/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: accumulo/branches/ACCUMULO-578/server/
------------------------------------------------------------------------------
Merged /accumulo/trunk/server:r1344413-1345120
Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/Master.java Tue Jun 5 11:49:28 2012
@@ -217,6 +217,8 @@ public class Master implements LiveTServ
volatile private SortedMap<TServerInstance,TabletServerStatus> tserverStatus = Collections
.unmodifiableSortedMap(new TreeMap<TServerInstance,TabletServerStatus>());
+ private Set<String> recoveriesInProgress = Collections.synchronizedSet(new HashSet<String>());
+
synchronized private MasterState getMasterState() {
return state;
}
@@ -1350,7 +1352,7 @@ public class Master implements LiveTServ
if (goal == TabletGoalState.HOSTED) {
if (state != TabletState.HOSTED && !tls.walogs.isEmpty()) {
- if (recoverLogs(tls.walogs))
+ if (recoverLogs(tls.extent, tls.walogs))
continue;
}
switch (state) {
@@ -2001,7 +2003,7 @@ public class Master implements LiveTServ
result.put(server, status);
// TODO maybe remove from bad servers
} catch (Exception ex) {
- log.error("unable to get tablet server status " + server + " " + ex.getMessage());
+ log.error("unable to get tablet server status " + server + " " + ex.toString());
log.debug("unable to get tablet server status " + server, ex);
if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
log.warn("attempting to stop " + server);
@@ -2026,7 +2028,7 @@ public class Master implements LiveTServ
return result;
}
- public boolean recoverLogs(Collection<Collection<String>> walogs) throws IOException {
+ public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
boolean recoveryNeeded = false;
for (Collection<String> logs : walogs) {
for (String log : logs) {
@@ -2034,17 +2036,17 @@ public class Master implements LiveTServ
String host = parts[0];
String filename = parts[1];
if (fs.exists(new Path(Constants.getRecoveryDir(getSystemConfiguration()) + "/" + filename + "/finished"))) {
+ recoveriesInProgress.remove(filename);
continue;
}
recoveryNeeded = true;
- String zPath = ZooUtil.getRoot(instance) + Constants.ZRECOVERY + "/" + filename;
- try {
- if (!ZooReaderWriter.getInstance().exists(zPath)) {
+ synchronized (recoveriesInProgress) {
+ if (!recoveriesInProgress.contains(filename)) {
+ Master.log.info("Starting recovery of " + filename + " created for " + host + ", tablet " + extent + " holds a reference");
long tid = fate.startTransaction();
fate.seedTransaction(tid, new RecoverLease(host, filename), true);
+ recoveriesInProgress.add(filename);
}
- } catch (Exception ex) {
- Master.log.info("Unable to check the recovery entry " + filename + ", will retry");
}
}
}
@@ -2291,4 +2293,7 @@ public class Master implements LiveTServ
return this.fs;
}
+ public void updateRecoveryInProgress(String file) {
+ recoveriesInProgress.add(file);
+ }
}
Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoverLease.java Tue Jun 5 11:49:28 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.maste
import java.io.IOException;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.fate.Repo;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.tableOps.MasterRepo;
@@ -34,10 +35,12 @@ public class RecoverLease extends Master
private String server;
private String file;
+ private long start;
public RecoverLease(String server, String file) {
this.server = server;
this.file = file;
+ this.start = System.currentTimeMillis();
}
public static Path getSource(Master master, String server, String file) {
@@ -55,6 +58,10 @@ public class RecoverLease extends Master
@Override
public long isReady(long tid, Master master) throws Exception {
+ master.updateRecoveryInProgress(file);
+ long diff = System.currentTimeMillis() - start;
+ if (diff < master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY))
+ return Math.max(diff, 0);
FileSystem fs = master.getFileSystem();
if (fs.exists(getSource(master)))
return 0;
Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Tue Jun 5 11:49:28 2012
@@ -46,6 +46,7 @@ public class SubmitFileForRecovery exten
@Override
public Repo<Master> call(long tid, final Master master) throws Exception {
+ master.updateRecoveryInProgress(file);
String source = RecoverLease.getSource(master, server, file).toString();
ZooReaderWriter zoo = ZooReaderWriter.getInstance();
final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Tue Jun 5 11:49:28 2012
@@ -2055,11 +2055,9 @@ public class TabletServer extends Abstra
nextFile:
for (String filename : filenames) {
for (String logger : loggers) {
- log.debug("logger " + logger + " filename " + filename);
if (logger.contains(filename))
continue nextFile;
}
- // this check is not strictly necessary
List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>();
synchronized (onlineTablets) {
onlineTabletsCopy.addAll(onlineTablets.values());
@@ -2067,21 +2065,27 @@ public class TabletServer extends Abstra
for (Tablet tablet : onlineTabletsCopy) {
for (String current : tablet.getCurrentLogs()) {
if (current.contains(filename)) {
- log.error("Attempting to remove a write-ahead log that is in use. This should never happen!");
log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent());
continue nextFile;
}
}
}
try {
+ String source = logDir + "/" + filename;
if (acuConf.getBoolean(Property.TSERV_ARCHIVE_WALOGS)) {
- log.info("Archiving walog " + filename);
- fs.rename(new Path(logDir, filename), new Path(Constants.getBaseDir(acuConf) + "/walogArchive", filename));
+ String walogArchive = Constants.getBaseDir(acuConf) + "/walogArchive";
+ fs.mkdirs(new Path(walogArchive));
+ String dest = walogArchive + "/" + filename;
+ log.info("Archiving walog " + source + " to " + dest);
+ if (!fs.rename(new Path(source), new Path(dest)))
+ log.error("rename is unsuccessful");
} else {
log.info("Deleting walog " + filename);
- fs.delete(new Path(logDir, filename), true);
- log.info("Deleting any recovery version of the log ");
- fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true);
+ if (!fs.delete(new Path(source), true))
+ log.warn("Failed to delete walog " + source);
+ if (fs.delete(new Path(Constants.getRecoveryDir(acuConf), filename), true))
+ log.info("Deleted any recovery log " + filename);
+
}
} catch (IOException e) {
log.warn("Error attempting to delete write-ahead log " + filename + ": " + e);
Modified: accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1346359&r1=1346358&r2=1346359&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-578/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Tue Jun 5 11:49:28 2012
@@ -260,6 +260,7 @@ public class LogSorter {
log.debug("got the lock, but thread pool is busy; released the lock on " + child);
continue;
}
+ log.info("got lock for " + child);
byte[] contents = zoo.getData(childPath, null);
String destination = Constants.getRecoveryDir(conf) + "/" + child;
startSort(new String(contents), destination, new LogSortNotifier() {
Propchange: accumulo/branches/ACCUMULO-578/src/
------------------------------------------------------------------------------
Merged /accumulo/trunk/src:r1344413-1345120