You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/07/05 18:19:06 UTC
svn commit: r1357721 - in
/accumulo/trunk/server/src/main/java/org/apache/accumulo/server:
master/recovery/SubmitFileForRecovery.java tabletserver/TabletServer.java
tabletserver/log/LogSorter.java
Author: kturner
Date: Thu Jul 5 16:19:05 2012
New Revision: 1357721
URL: http://svn.apache.org/viewvc?rev=1357721&view=rev
Log:
ACCUMULO-409 Modified log recovery to use new DistributedWorkQueue
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1357721&r1=1357720&r2=1357721&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Thu Jul 5 16:19:05 2012
@@ -21,9 +21,9 @@ import java.io.IOException;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -48,18 +48,19 @@ public class SubmitFileForRecovery exten
public Repo<Master> call(long tid, final Master master) throws Exception {
master.updateRecoveryInProgress(file);
String source = RecoverLease.getSource(master, server, file).toString();
+ new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+
ZooReaderWriter zoo = ZooReaderWriter.getInstance();
final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
- zoo.putPersistentData(path, source.getBytes(), NodeExistsPolicy.SKIP);
log.info("Created zookeeper entry " + path + " with data " + source);
zoo.exists(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
- case NodeDataChanged:
+ case NodeDeleted:
log.info("noticed recovery entry for " + file + " was removed");
FileSystem fs = master.getFileSystem();
- Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()), "finished");
+ Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + file, "finished");
try {
if (fs.exists(finished))
log.info("log recovery for " + file + " successful");
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1357721&r1=1357720&r2=1357721&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu Jul 5 16:19:05 2012
@@ -51,7 +51,6 @@ import java.util.concurrent.ArrayBlockin
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
@@ -126,9 +125,9 @@ import org.apache.accumulo.core.util.Cac
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -2714,22 +2713,22 @@ public class TabletServer extends Abstra
}
clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
announceExistence();
- try {
- logSorter.startWatchingForRecoveryLogs(getClientAddressString());
- } catch (Exception ex) {
- log.error("Error setting watches for recoveries");
- throw new RuntimeException(ex);
- }
- ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
- new NamingThreadFactory("distributed work queue"));
+ ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
- bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ);
+ bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
try {
bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
} catch (Exception e1) {
throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
}
+
+ try {
+ logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
+ } catch (Exception ex) {
+ log.error("Error setting watches for recoveries");
+ throw new RuntimeException(ex);
+ }
try {
OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1357721&r1=1357720&r2=1357721&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu Jul 5 16:19:05 2012
@@ -25,8 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
-import java.util.TimerTask;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
@@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
/**
*
*/
public class LogSorter {
+
private static final Logger log = Logger.getLogger(LogSorter.class);
FileSystem fs;
AccumuloConfiguration conf;
- private Map<String,Work> currentWork = new HashMap<String,Work>();
+ private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
- class Work implements Runnable {
- final String name;
- FSDataInputStream input;
- final String destPath;
- long bytesCopied = -1;
- long sortStart = 0;
- long sortStop = -1;
- private final LogSortNotifier cback;
+ class LogProcessor implements Processor {
- synchronized long getBytesCopied() throws IOException {
- return input == null ? bytesCopied : input.getPos();
+ private FSDataInputStream input;
+ private long bytesCopied = -1;
+ private long sortStart = 0;
+ private long sortStop = -1;
+
+ @Override
+ public Processor newProcessor() {
+ return new LogProcessor();
+ }
+
+ @Override
+ public void process(String child, byte[] data) {
+ String dest = Constants.getRecoveryDir(conf) + "/" + child;
+ String src = new String(data);
+ String name = new Path(src).getName();
+
+ synchronized (currentWork) {
+ if (currentWork.containsKey(name))
+ return;
+ currentWork.put(name, this);
+ }
+
+ try {
+ log.info("Copying " + src + " to " + dest);
+ sort(name, new Path(src), dest);
+ } finally {
+ currentWork.remove(name);
+ }
+
}
- Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
- this.name = name;
- this.input = input;
- this.destPath = destPath;
- this.cback = cback;
- }
- synchronized boolean finished() {
- return input == null;
- }
- public void run() {
- sortStart = System.currentTimeMillis();
+ public void sort(String name, Path srcPath, String destPath) {
+
+ synchronized (this) {
+ sortStart = System.currentTimeMillis();
+ }
+
String formerThreadName = Thread.currentThread().getName();
int part = 0;
try {
+
+ // the following call does not throw an exception if the file/dir does not exist
+ fs.delete(new Path(destPath), true);
+
+ FSDataInputStream tmpInput = fs.open(srcPath);
+ synchronized (this) {
+ this.input = tmpInput;
+ }
+
final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
Thread.currentThread().setName("Sorting " + name + " for recovery");
while (true) {
- final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+ final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
try {
long start = input.getPos();
while (input.getPos() - start < bufferSize) {
@@ -103,29 +120,26 @@ public class LogSorter {
LogFileValue value = new LogFileValue();
key.readFields(input);
value.readFields(input);
- buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+ buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
}
- writeBuffer(buffer, part++);
+ writeBuffer(destPath, buffer, part++);
buffer.clear();
} catch (EOFException ex) {
- writeBuffer(buffer, part++);
+ writeBuffer(destPath, buffer, part++);
break;
}
}
fs.create(new Path(destPath, "finished")).close();
- log.debug("Log copy/sort of " + name + " complete");
+ log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
} catch (Throwable t) {
try {
+ // parent dir may not exist
+ fs.mkdirs(new Path(destPath));
fs.create(new Path(destPath, "failed")).close();
} catch (IOException e) {
log.error("Error creating failed flag file " + name, e);
}
log.error(t, t);
- try {
- cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
- } catch (Exception ex) {
- log.error("Strange error notifying the master of a logSort problem for file " + name);
- }
} finally {
Thread.currentThread().setName(formerThreadName);
try {
@@ -133,19 +147,13 @@ public class LogSorter {
} catch (IOException e) {
log.error("Error during cleanup sort/copy " + name, e);
}
- sortStop = System.currentTimeMillis();
- synchronized (currentWork) {
- currentWork.remove(name);
- }
- try {
- cback.notice(name, getBytesCopied(), part, getSortTime(), "");
- } catch (Exception ex) {
- log.error("Strange error reporting successful log sort " + name, ex);
+ synchronized (this) {
+ sortStop = System.currentTimeMillis();
}
}
}
- private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+ private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
String path = destPath + String.format("/part-r-%05d", part++);
MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
try {
@@ -162,7 +170,7 @@ public class LogSorter {
output.close();
}
}
-
+
synchronized void close() throws IOException {
bytesCopied = input.getPos();
input.close();
@@ -177,9 +185,13 @@ public class LogSorter {
}
return 0;
}
- };
+
+ synchronized long getBytesCopied() throws IOException {
+ return input == null ? bytesCopied : input.getPos();
+ }
+ }
- final ThreadPoolExecutor threadPool;
+ ThreadPoolExecutor threadPool;
private Instance instance;
public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
@@ -189,132 +201,16 @@ public class LogSorter {
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
}
-
- public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
- final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
- final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
- zoo.mkdirs(path);
- List<String> children = zoo.getChildren(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case NodeChildrenChanged:
- if (event.getPath().equals(path))
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- else
- log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
- break;
- case NodeCreated:
- case NodeDataChanged:
- case NodeDeleted:
- case None:
- log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
- break;
-
- }
- }
- });
- attemptRecoveries(zoo, serverName, path, children);
- Random r = new Random();
- // Add a little jitter to avoid all the tservers slamming zookeeper at once
- SimpleTimer.getInstance().schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- }
- }, r.nextInt(1000), 60 * 1000);
- }
-
- private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) {
- if (children.size() == 0)
- return;
-
- if (threadPool.getQueue().size() > 1)
- return;
- log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks");
- Random random = new Random();
- Collections.shuffle(children, random);
- try {
- for (String child : children) {
- final String childPath = path + "/" + child;
- log.debug("Attempting to lock " + child);
- ZooLock lock = new ZooLock(childPath);
- if (lock.tryLock(new LockWatcher() {
- @Override
- public void lostLock(LockLossReason reason) {
- log.info("Ignoring lost lock event, reason " + reason);
- }
- }, serverName.getBytes())) {
- // Great... we got the lock, but maybe we're too busy
- if (threadPool.getQueue().size() > 1) {
- lock.unlock();
- log.debug("got the lock, but thread pool is busy; released the lock on " + child);
- break;
- }
- log.debug("got lock for " + child);
- byte[] contents = zoo.getData(childPath, null);
- String destination = Constants.getRecoveryDir(conf) + "/" + child;
- startSort(new String(contents), destination, new LogSortNotifier() {
- @Override
- public void notice(String name, long bytes, int parts, long milliseconds, String error) {
- log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
- try {
- zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
- } catch (Exception e) {
- log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
- }
- try {
- attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Unable to get recovery information", e);
- } catch (InterruptedException e) {
- log.info("Interrupted getting recovery information", e);
- }
- }
- });
- } else {
- log.debug("failed to get the lock " + child);
- }
- }
- } catch (Throwable t) {
- log.error("Unexpected error", t);
- }
- }
-
- public interface LogSortNotifier {
- public void notice(String name, long bytes, int parts, long milliseconds, String error);
- }
-
- private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
- log.info("Copying " + src + " to " + dest);
- fs.delete(new Path(dest), true);
- Path srcPath = new Path(src);
- synchronized (currentWork) {
- Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
- if (!currentWork.containsKey(srcPath.getName())) {
- threadPool.execute(work);
- currentWork.put(srcPath.getName(), work);
- }
- }
+ public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+ this.threadPool = distWorkQThreadPool;
+ new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
}
public List<RecoveryStatus> getLogSorts() {
List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
synchronized (currentWork) {
- for (Entry<String,Work> entries : currentWork.entrySet()) {
+ for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
RecoveryStatus status = new RecoveryStatus();
status.name = entries.getKey();
try {