You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2012/07/05 18:19:06 UTC

svn commit: r1357721 - in /accumulo/trunk/server/src/main/java/org/apache/accumulo/server: master/recovery/SubmitFileForRecovery.java tabletserver/TabletServer.java tabletserver/log/LogSorter.java

Author: kturner
Date: Thu Jul  5 16:19:05 2012
New Revision: 1357721

URL: http://svn.apache.org/viewvc?rev=1357721&view=rev
Log:
ACCUMULO-409 Modified log recovery to use new DistributedWorkQueue

Modified:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java?rev=1357721&r1=1357720&r2=1357721&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/SubmitFileForRecovery.java Thu Jul  5 16:19:05 2012
@@ -21,9 +21,9 @@ import java.io.IOException;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.master.Master;
 import org.apache.accumulo.server.master.tableOps.MasterRepo;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,18 +48,19 @@ public class SubmitFileForRecovery exten
   public Repo<Master> call(long tid, final Master master) throws Exception {
     master.updateRecoveryInProgress(file);
     String source = RecoverLease.getSource(master, server, file).toString();
+    new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+    
     ZooReaderWriter zoo = ZooReaderWriter.getInstance();
     final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
-    zoo.putPersistentData(path, source.getBytes(), NodeExistsPolicy.SKIP);
     log.info("Created zookeeper entry " + path + " with data " + source);
     zoo.exists(path, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
         switch (event.getType()) {
-          case NodeDataChanged:
+          case NodeDeleted:
             log.info("noticed recovery entry for " + file + " was removed");
             FileSystem fs = master.getFileSystem();
-            Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()), "finished");
+            Path finished = new Path(Constants.getRecoveryDir(master.getSystemConfiguration()) + "/" + file, "finished");
             try {
               if (fs.exists(finished))
                 log.info("log recovery for " + file + " successful");

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1357721&r1=1357720&r2=1357721&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Thu Jul  5 16:19:05 2012
@@ -51,7 +51,6 @@ import java.util.concurrent.ArrayBlockin
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -126,9 +125,9 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
-import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.util.UtilWaitThread;
@@ -2714,22 +2713,22 @@ public class TabletServer extends Abstra
     }
     clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
     announceExistence();
-    try {
-      logSorter.startWatchingForRecoveryLogs(getClientAddressString());
-    } catch (Exception ex) {
-      log.error("Error setting watches for recoveries");
-      throw new RuntimeException(ex);
-    }
     
-    ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
-        new NamingThreadFactory("distributed work queue"));
+    ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
 
-    bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ);
+    bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ);
     try {
       bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
     } catch (Exception e1) {
       throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
     }
+    
+    try {
+      logSorter.startWatchingForRecoveryLogs(distWorkQThreadPool);
+    } catch (Exception ex) {
+      log.error("Error setting watches for recoveries");
+      throw new RuntimeException(ex);
+    }
 
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1357721&r1=1357720&r2=1357721&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Thu Jul  5 16:19:05 2012
@@ -25,8 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
-import java.util.TimerTask;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.accumulo.core.Constants;
@@ -37,65 +35,84 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
-import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.accumulo.server.zookeeper.ZooLock;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 
 /**
  * 
  */
 public class LogSorter {
   
+
   private static final Logger log = Logger.getLogger(LogSorter.class);
   FileSystem fs;
   AccumuloConfiguration conf;
   
-  private Map<String,Work> currentWork = new HashMap<String,Work>();
+  private Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<String,LogProcessor>());
 
-  class Work implements Runnable {
-    final String name;
-    FSDataInputStream input;
-    final String destPath;
-    long bytesCopied = -1;
-    long sortStart = 0;
-    long sortStop = -1;
-    private final LogSortNotifier cback;
+  class LogProcessor implements Processor {
     
-    synchronized long getBytesCopied() throws IOException {
-      return input == null ? bytesCopied : input.getPos();
+    private FSDataInputStream input;
+    private long bytesCopied = -1;
+    private long sortStart = 0;
+    private long sortStop = -1;
+    
+    @Override
+    public Processor newProcessor() {
+      return new LogProcessor();
+    }
+    
+    @Override
+    public void process(String child, byte[] data) {
+      String dest = Constants.getRecoveryDir(conf) + "/" + child;
+      String src = new String(data);
+      String name = new Path(src).getName();
+      
+      synchronized (currentWork) {
+        if (currentWork.containsKey(name))
+          return;
+        currentWork.put(name, this);
+      }
+      
+      try {
+        log.info("Copying " + src + " to " + dest);
+        sort(name, new Path(src), dest);
+      } finally {
+        currentWork.remove(name);
+      }
+      
     }
     
-    Work(String name, FSDataInputStream input, String destPath, LogSortNotifier cback) {
-      this.name = name;
-      this.input = input;
-      this.destPath = destPath;
-      this.cback = cback;
-    }
-    synchronized boolean finished() {
-      return input == null;
-    }
-    public void run() {
-      sortStart = System.currentTimeMillis();
+    public void sort(String name, Path srcPath, String destPath) {
+
+      synchronized (this) {
+        sortStart = System.currentTimeMillis();
+      }
+
       String formerThreadName = Thread.currentThread().getName();
       int part = 0;
       try {
+        
+        // the following call does not throw an exception if the file/dir does not exist
+        fs.delete(new Path(destPath), true);
+
+        FSDataInputStream tmpInput = fs.open(srcPath);
+        synchronized (this) {
+          this.input = tmpInput;
+        }
+
         final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
         Thread.currentThread().setName("Sorting " + name + " for recovery");
         while (true) {
-          final ArrayList<Pair<LogFileKey, LogFileValue>> buffer = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+          final ArrayList<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<Pair<LogFileKey,LogFileValue>>();
           try {
             long start = input.getPos();
             while (input.getPos() - start < bufferSize) {
@@ -103,29 +120,26 @@ public class LogSorter {
               LogFileValue value = new LogFileValue();
               key.readFields(input);
               value.readFields(input);
-              buffer.add(new Pair<LogFileKey, LogFileValue>(key, value));
+              buffer.add(new Pair<LogFileKey,LogFileValue>(key, value));
             }
-            writeBuffer(buffer, part++);
+            writeBuffer(destPath, buffer, part++);
             buffer.clear();
           } catch (EOFException ex) {
-            writeBuffer(buffer, part++);
+            writeBuffer(destPath, buffer, part++);
             break;
           }
         }
         fs.create(new Path(destPath, "finished")).close();
-        log.debug("Log copy/sort of " + name + " complete");
+        log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms");
       } catch (Throwable t) {
         try {
+          // parent dir may not exist
+          fs.mkdirs(new Path(destPath));
           fs.create(new Path(destPath, "failed")).close();
         } catch (IOException e) {
           log.error("Error creating failed flag file " + name, e);
         }
         log.error(t, t);
-        try {
-          cback.notice(name, getBytesCopied(), part, getSortTime(), t.toString());
-        } catch (Exception ex) {
-          log.error("Strange error notifying the master of a logSort problem for file " + name);
-        }
       } finally {
         Thread.currentThread().setName(formerThreadName);
         try {
@@ -133,19 +147,13 @@ public class LogSorter {
         } catch (IOException e) {
           log.error("Error during cleanup sort/copy " + name, e);
         }
-        sortStop = System.currentTimeMillis();
-        synchronized (currentWork) {
-          currentWork.remove(name);
-        }
-        try {
-          cback.notice(name, getBytesCopied(), part, getSortTime(), "");
-        } catch (Exception ex) {
-          log.error("Strange error reporting successful log sort " + name, ex);
+        synchronized (this) {
+          sortStop = System.currentTimeMillis();
         }
       }
     }
     
-    private void writeBuffer(ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
+    private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
       String path = destPath + String.format("/part-r-%05d", part++);
       MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
       try {
@@ -162,7 +170,7 @@ public class LogSorter {
         output.close();
       }
     }
-    
+
     synchronized void close() throws IOException {
       bytesCopied = input.getPos();
       input.close();
@@ -177,9 +185,13 @@ public class LogSorter {
       }
       return 0;
     }
-  };
+    
+    synchronized long getBytesCopied() throws IOException {
+      return input == null ? bytesCopied : input.getPos();
+    }
+  }
   
-  final ThreadPoolExecutor threadPool;
+  ThreadPoolExecutor threadPool;
   private Instance instance;
   
   public LogSorter(Instance instance, FileSystem fs, AccumuloConfiguration conf) {
@@ -189,132 +201,16 @@ public class LogSorter {
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
     this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
   }
-  
-  public void startWatchingForRecoveryLogs(final String serverName) throws KeeperException, InterruptedException {
-    final String path = ZooUtil.getRoot(instance) + Constants.ZRECOVERY;
-    final ZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    zoo.mkdirs(path);
-    List<String> children = zoo.getChildren(path, new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        switch (event.getType()) {
-          case NodeChildrenChanged:
-            if (event.getPath().equals(path))
-              try {
-                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path, this));
-              } catch (KeeperException e) {
-                log.error("Unable to get recovery information", e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted getting recovery information", e);
-              }
-            else
-              log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
-            break;
-          case NodeCreated:
-          case NodeDataChanged:
-          case NodeDeleted:
-          case None:
-            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
-            break;
-          
-        }
-      }
-    });
-    attemptRecoveries(zoo, serverName, path, children);
-    Random r = new Random();
-    // Add a little jitter to avoid all the tservers slamming zookeeper at once
-    SimpleTimer.getInstance().schedule(new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
-        } catch (KeeperException e) {
-          log.error("Unable to get recovery information", e);
-        } catch (InterruptedException e) {
-          log.info("Interrupted getting recovery information", e);
-        }        
-      }
-    }, r.nextInt(1000), 60 * 1000);
-  }
-  
-  private void attemptRecoveries(final ZooReaderWriter zoo, final String serverName, final String path, List<String> children) {
-    if (children.size() == 0)
-      return;
-    
-    if (threadPool.getQueue().size() > 1)
-      return;
 
-    log.debug("Zookeeper references " + children.size() + " recoveries, attempting locks");
-    Random random = new Random();
-    Collections.shuffle(children, random);
-    try {
-      for (String child : children) {
-        final String childPath = path + "/" + child;
-        log.debug("Attempting to lock " + child);
-        ZooLock lock = new ZooLock(childPath);
-        if (lock.tryLock(new LockWatcher() {
-          @Override
-          public void lostLock(LockLossReason reason) {
-            log.info("Ignoring lost lock event, reason " + reason);
-          }
-        }, serverName.getBytes())) {
-          // Great... we got the lock, but maybe we're too busy
-          if (threadPool.getQueue().size() > 1) {
-            lock.unlock();
-            log.debug("got the lock, but thread pool is busy; released the lock on " + child);
-            break;
-          }
-          log.debug("got lock for " + child);
-          byte[] contents = zoo.getData(childPath, null);
-          String destination = Constants.getRecoveryDir(conf) + "/" + child;
-          startSort(new String(contents), destination, new LogSortNotifier() {
-            @Override
-            public void notice(String name, long bytes, int parts, long milliseconds, String error) {
-              log.info("Finished log sort " + name + " " + bytes + " bytes " + parts + " parts in " + milliseconds + "ms");
-              try {
-                zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
-              } catch (Exception e) {
-                log.error("Error received when trying to delete recovery entry in zookeeper " + childPath);
-              }
-              try {
-                attemptRecoveries(zoo, serverName, path, zoo.getChildren(path));
-              } catch (KeeperException e) {
-                log.error("Unable to get recovery information", e);
-              } catch (InterruptedException e) {
-                log.info("Interrupted getting recovery information", e);
-              }
-            }
-          });
-        } else {
-          log.debug("failed to get the lock " + child);
-        }
-      }
-    } catch (Throwable t) {
-      log.error("Unexpected error", t);
-    }
-  }
-
-  public interface LogSortNotifier {
-    public void notice(String name, long bytes, int parts, long milliseconds, String error);
-  }
-
-  private void startSort(String src, String dest, LogSortNotifier cback) throws IOException {
-    log.info("Copying " + src + " to " + dest);
-    fs.delete(new Path(dest), true);
-    Path srcPath = new Path(src);
-    synchronized (currentWork) {
-      Work work = new Work(srcPath.getName(), fs.open(srcPath), dest, cback);
-      if (!currentWork.containsKey(srcPath.getName())) {
-        threadPool.execute(work);
-        currentWork.put(srcPath.getName(), work);
-      }
-    }
+  public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
+    this.threadPool = distWorkQThreadPool;
+    new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool);
   }
   
   public List<RecoveryStatus> getLogSorts() {
     List<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
     synchronized (currentWork) {
-      for (Entry<String,Work> entries : currentWork.entrySet()) {
+      for (Entry<String,LogProcessor> entries : currentWork.entrySet()) {
         RecoveryStatus status = new RecoveryStatus();
         status.name = entries.getKey();
         try {