You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2014/01/10 22:37:35 UTC

[1/5] git commit: ACCUMULO-2160 back-port real bugs found by findbugs

Updated Branches:
  refs/heads/1.6.0-SNAPSHOT deae04f67 -> 7c309097c


ACCUMULO-2160 back-port real bugs found by findbugs


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6d49e1a4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6d49e1a4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6d49e1a4

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 6d49e1a48af4dfb0b9d5a8bcf152567b97dd9c79
Parents: 5dd6f84
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 15:56:35 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 15:56:35 2014 -0500

----------------------------------------------------------------------
 src/server/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/6d49e1a4/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --git a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
index 4e909a7..253962b 100644
--- a/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/src/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@ -117,7 +117,7 @@ public class Accumulo {
       System.setProperty("org.apache.accumulo.core.host.log", localhost);
     
     // Use a specific log config, if it exists
-    String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
+    String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
     if (!new File(logConfig).exists()) {
       // otherwise, use the generic config
       logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));


[2/5] git commit: Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT

Posted by ec...@apache.org.
Merge branch '1.4.5-SNAPSHOT' into 1.5.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ee9035fa
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ee9035fa
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ee9035fa

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: ee9035fac4ed6eac1ad60c29ed0b6d5bd46248ec
Parents: cb50a74 6d49e1a
Author: Eric Newton <er...@gmail.com>
Authored: Thu Jan 9 15:58:16 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Thu Jan 9 15:58:16 2014 -0500

----------------------------------------------------------------------
 server/src/main/java/org/apache/accumulo/server/Accumulo.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/ee9035fa/server/src/main/java/org/apache/accumulo/server/Accumulo.java
----------------------------------------------------------------------
diff --cc server/src/main/java/org/apache/accumulo/server/Accumulo.java
index f56dfd8,0000000..33bb871
mode 100644,000000..100644
--- a/server/src/main/java/org/apache/accumulo/server/Accumulo.java
+++ b/server/src/main/java/org/apache/accumulo/server/Accumulo.java
@@@ -1,309 -1,0 +1,309 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.server;
 +
 +import java.io.File;
 +import java.io.FileInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.lang.reflect.Method;
 +import java.net.InetAddress;
 +import java.net.UnknownHostException;
 +import java.util.Map.Entry;
 +import java.util.TreeMap;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.trace.DistributedTrace;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.core.util.Version;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.client.HdfsZooInstance;
 +import org.apache.accumulo.server.conf.ServerConfiguration;
 +import org.apache.accumulo.server.util.time.SimpleTimer;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hdfs.DistributedFileSystem;
 +import org.apache.log4j.LogManager;
 +import org.apache.log4j.Logger;
 +import org.apache.log4j.helpers.FileWatchdog;
 +import org.apache.log4j.helpers.LogLog;
 +import org.apache.log4j.xml.DOMConfigurator;
 +import org.apache.zookeeper.KeeperException;
 +import org.apache.zookeeper.WatchedEvent;
 +import org.apache.zookeeper.Watcher;
 +
 +public class Accumulo {
 +  
 +  private static final Logger log = Logger.getLogger(Accumulo.class);
 +  
 +  public static synchronized void updateAccumuloVersion(FileSystem fs) {
 +    try {
 +      if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
 +        fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION));
 +        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION), false);
 +      }
 +    } catch (IOException e) {
 +      throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
 +    }
 +  }
 +  
 +  public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
 +    int dataVersion;
 +    try {
 +      FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
 +      if (files == null || files.length == 0) {
 +        dataVersion = -1; // assume it is 0.5 or earlier
 +      } else {
 +        dataVersion = Integer.parseInt(files[0].getPath().getName());
 +      }
 +      return dataVersion;
 +    } catch (IOException e) {
 +      throw new RuntimeException("Unable to read accumulo version: an error occurred.", e);
 +    }
 +  }
 +  
 +  public static void enableTracing(String address, String application) {
 +    try {
 +      DistributedTrace.enable(HdfsZooInstance.getInstance(), ZooReaderWriter.getInstance(), application, address);
 +    } catch (Exception ex) {
 +      log.error("creating remote sink for trace spans", ex);
 +    }
 +  }
 +  
 +  private static class LogMonitor extends FileWatchdog implements Watcher {
 +    String path;
 +    
 +    protected LogMonitor(String instance, String filename, int delay) {
 +      super(filename);
 +      setDelay(delay);
 +      this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_PORT;
 +    }
 +    
 +    private void setMonitorPort() {
 +      try {
 +        String port = new String(ZooReaderWriter.getInstance().getData(path, null));
 +        System.setProperty("org.apache.accumulo.core.host.log.port", port);
 +        log.info("Changing monitor log4j port to "+port);
 +        doOnChange();
 +      } catch (Exception e) {
 +        log.error("Error reading zookeeper data for monitor log4j port", e);
 +      }
 +    }
 +    
 +    @Override
 +    public void run() {
 +      try {
 +        if (ZooReaderWriter.getInstance().getZooKeeper().exists(path, this) != null)
 +          setMonitorPort();
 +        log.info("Set watch for monitor log4j port");
 +      } catch (Exception e) {
 +        log.error("Unable to set watch for monitor log4j port " + path);
 +      }
 +      super.run();
 +    }
 +    
 +    @Override
 +    protected void doOnChange() {
 +      LogManager.resetConfiguration();
 +      new DOMConfigurator().doConfigure(filename, LogManager.getLoggerRepository());
 +    }
 +    
 +    @Override
 +    public void process(WatchedEvent event) {
 +      setMonitorPort();
 +      if (event.getPath() != null) {
 +        try {
 +          ZooReaderWriter.getInstance().exists(event.getPath(), this);
 +        } catch (Exception ex) {
 +          log.error("Unable to reset watch for monitor log4j port", ex);
 +        }
 +      }
 +    }
 +  }
 +  
 +  public static void init(FileSystem fs, ServerConfiguration config, String application) throws UnknownHostException {
 +    
 +    System.setProperty("org.apache.accumulo.core.application", application);
 +    
 +    if (System.getenv("ACCUMULO_LOG_DIR") != null)
 +      System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_LOG_DIR"));
 +    else
 +      System.setProperty("org.apache.accumulo.core.dir.log", System.getenv("ACCUMULO_HOME") + "/logs/");
 +    
 +    String localhost = InetAddress.getLocalHost().getHostName();
 +    System.setProperty("org.apache.accumulo.core.ip.localhost.hostname", localhost);
 +    
 +    if (System.getenv("ACCUMULO_LOG_HOST") != null)
 +      System.setProperty("org.apache.accumulo.core.host.log", System.getenv("ACCUMULO_LOG_HOST"));
 +    else
 +      System.setProperty("org.apache.accumulo.core.host.log", localhost);
 +    
 +    int logPort = config.getConfiguration().getPort(Property.MONITOR_LOG4J_PORT);
 +    System.setProperty("org.apache.accumulo.core.host.log.port", Integer.toString(logPort));
 +    
 +    // Use a specific log config, if it exists
-     String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
++    String logConfig = String.format("%s/%s_logger.xml", System.getenv("ACCUMULO_CONF_DIR"), application);
 +    if (!new File(logConfig).exists()) {
 +      // otherwise, use the generic config
 +      logConfig = String.format("%s/generic_logger.xml", System.getenv("ACCUMULO_CONF_DIR"));
 +    }
 +    // Turn off messages about not being able to reach the remote logger... we protect against that.
 +    LogLog.setQuietMode(true);
 +    
 +    // Configure logging
 +    if (logPort==0)
 +      new LogMonitor(config.getInstance().getInstanceID(), logConfig, 5000).start();
 +    else
 +      DOMConfigurator.configureAndWatch(logConfig, 5000);
 +    
 +    log.info(application + " starting");
 +    log.info("Instance " + config.getInstance().getInstanceID());
 +    int dataVersion = Accumulo.getAccumuloPersistentVersion(fs);
 +    log.info("Data Version " + dataVersion);
 +    Accumulo.waitForZookeeperAndHdfs(fs);
 +    
 +    Version codeVersion = new Version(Constants.VERSION);
 +    if (dataVersion != Constants.DATA_VERSION && dataVersion != Constants.PREV_DATA_VERSION) {
 +      throw new RuntimeException("This version of accumulo (" + codeVersion + ") is not compatible with files stored using data version " + dataVersion);
 +    }
 +    
 +    TreeMap<String,String> sortedProps = new TreeMap<String,String>();
 +    for (Entry<String,String> entry : config.getConfiguration())
 +      sortedProps.put(entry.getKey(), entry.getValue());
 +    
 +    for (Entry<String,String> entry : sortedProps.entrySet()) {
 +      if (entry.getKey().toLowerCase().contains("password") || entry.getKey().toLowerCase().contains("secret")
 +          || entry.getKey().startsWith(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey()))
 +        log.info(entry.getKey() + " = <hidden>");
 +      else
 +        log.info(entry.getKey() + " = " + entry.getValue());
 +    }
 +    
 +    monitorSwappiness();
 +  }
 +  
 +  /**
 +   * 
 +   */
 +  public static void monitorSwappiness() {
 +    SimpleTimer.getInstance().schedule(new Runnable() {
 +      @Override
 +      public void run() {
 +        try {
 +          String procFile = "/proc/sys/vm/swappiness";
 +          File swappiness = new File(procFile);
 +          if (swappiness.exists() && swappiness.canRead()) {
 +            InputStream is = new FileInputStream(procFile);
 +            try {
 +              byte[] buffer = new byte[10];
 +              int bytes = is.read(buffer);
 +              String setting = new String(buffer, 0, bytes);
 +              setting = setting.trim();
 +              if (bytes > 0 && Integer.parseInt(setting) > 10) {
 +                log.warn("System swappiness setting is greater than ten (" + setting + ") which can cause time-sensitive operations to be delayed. "
 +                    + " Accumulo is time sensitive because it needs to maintain distributed lock agreement.");
 +              }
 +            } finally {
 +              is.close();
 +            }
 +          }
 +        } catch (Throwable t) {
 +          log.error(t, t);
 +        }
 +      }
 +    }, 1000, 10 * 60 * 1000);
 +  }
 +  
 +  public static String getLocalAddress(String[] args) throws UnknownHostException {
 +    InetAddress result = InetAddress.getLocalHost();
 +    for (int i = 0; i < args.length - 1; i++) {
 +      if (args[i].equals("-a") || args[i].equals("--address")) {
 +        result = InetAddress.getByName(args[i + 1]);
 +        log.debug("Local address is: " + args[i + 1] + " (" + result.toString() + ")");
 +        break;
 +      }
 +    }
 +    return result.getHostName();
 +  }
 +  
 +  public static void waitForZookeeperAndHdfs(FileSystem fs) {
 +    log.info("Attempting to talk to zookeeper");
 +    while (true) {
 +      try {
 +        ZooReaderWriter.getInstance().getChildren(Constants.ZROOT);
 +        break;
 +      } catch (InterruptedException e) {
 +        // ignored
 +      } catch (KeeperException ex) {
 +        log.info("Waiting for accumulo to be initialized");
 +        UtilWaitThread.sleep(1000);
 +      }
 +    }
 +    log.info("Zookeeper connected and initialized, attemping to talk to HDFS");
 +    long sleep = 1000;
 +    while (true) {
 +      try {
 +        if (!isInSafeMode(fs))
 +          break;
 +        log.warn("Waiting for the NameNode to leave safemode");
 +      } catch (IOException ex) {
 +        log.warn("Unable to connect to HDFS");
 +      }
 +      log.info("Sleeping " + sleep / 1000. + " seconds");
 +      UtilWaitThread.sleep(sleep);
 +      sleep = Math.min(60 * 1000, sleep * 2);
 +    }
 +    log.info("Connected to HDFS");
 +  }
 +  
 +  private static boolean isInSafeMode(FileSystem fs) throws IOException {
 +    if (!(fs instanceof DistributedFileSystem))
 +      return false;
 +    DistributedFileSystem dfs = (DistributedFileSystem)fs;
 +    // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
 +    // Becomes this:
 +    Class<?> safeModeAction;
 +    try {
 +      // hadoop 2.0
 +      safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
 +    } catch (ClassNotFoundException ex) {
 +      // hadoop 1.0
 +      try {
 +        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
 +      } catch (ClassNotFoundException e) {
 +        throw new RuntimeException("Cannot figure out the right class for Constants");
 +      }
 +    }
 +    Object get = null;
 +    for (Object obj : safeModeAction.getEnumConstants()) {
 +      if (obj.toString().equals("SAFEMODE_GET"))
 +        get = obj;
 +    }
 +    if (get == null) {
 +      throw new RuntimeException("cannot find SAFEMODE_GET");
 +    }
 +    try {
 +      Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
 +      return (Boolean) setSafeMode.invoke(dfs, get);
 +    } catch (Exception ex) {
 +      throw new RuntimeException("cannot find method setSafeMode");
 +    }
 +  }
 +}


[5/5] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Posted by ec...@apache.org.
Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT

Conflicts:
	server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/7c309097
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/7c309097
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/7c309097

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 7c309097c40779e96d51dd6c70333b5fb1cb8dd7
Parents: deae04f 3b41d37
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 10 16:37:19 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 10 16:37:19 2014 -0500

----------------------------------------------------------------------
 .../apache/accumulo/tserver/log/DfsLogger.java  | 56 +++++++++-----------
 .../tserver/log/TabletServerLogger.java         |  4 +-
 2 files changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/7c309097/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 571d1bc,0000000..cc28ac2
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@@ -1,548 -1,0 +1,544 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.log;
 +
 +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH;
 +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START;
 +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
 +import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS;
 +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
 +
 +import java.io.DataInputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.io.OutputStream;
 +import java.lang.reflect.Method;
 +import java.nio.channels.ClosedChannelException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.LinkedBlockingQueue;
 +
 +import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.crypto.CryptoModule;
 +import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 +import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule;
 +import org.apache.accumulo.core.security.crypto.NoFlushOutputStream;
 +import org.apache.accumulo.core.util.Daemon;
++import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.StringUtil;
 +import org.apache.accumulo.server.ServerConstants;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.master.state.TServerInstance;
 +import org.apache.accumulo.tserver.TabletMutations;
 +import org.apache.accumulo.tserver.logger.LogFileKey;
 +import org.apache.accumulo.tserver.logger.LogFileValue;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Wrap a connection to a logger.
 + * 
 + */
 +public class DfsLogger {
 +  // Package private so that LogSorter can find this
 +  static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---";
 +  static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---";
 +  
 +  private static Logger log = Logger.getLogger(DfsLogger.class);
 +  
 +  public static class LogClosedException extends IOException {
 +    private static final long serialVersionUID = 1L;
 +    
 +    public LogClosedException() {
 +      super("LogClosed");
 +    }
 +  }
 +  
 +  public static class DFSLoggerInputStreams {
 +    
 +    private FSDataInputStream originalInput;
 +    private DataInputStream decryptingInputStream;
 +
 +    public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) {
 +      this.originalInput = originalInput;
 +      this.decryptingInputStream = decryptingInputStream;
 +    }
 +
 +    public FSDataInputStream getOriginalInput() {
 +      return originalInput;
 +    }
 +
 +    public void setOriginalInput(FSDataInputStream originalInput) {
 +      this.originalInput = originalInput;
 +    }
 +
 +    public DataInputStream getDecryptingInputStream() {
 +      return decryptingInputStream;
 +    }
 +
 +    public void setDecryptingInputStream(DataInputStream decryptingInputStream) {
 +      this.decryptingInputStream = decryptingInputStream;
 +    }
 +  }
 +  
 +  
 +  public interface ServerResources {
 +    AccumuloConfiguration getConfiguration();
 +    
 +    VolumeManager getFileSystem();
 +    
 +    Set<TServerInstance> getCurrentTServers();
 +  }
 +  
 +  private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<DfsLogger.LogWork>();
 +  
 +  private final Object closeLock = new Object();
 +  
-   private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
++  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null);
 +  
 +  private static final LogFileValue EMPTY = new LogFileValue();
 +  
 +  private boolean closed = false;
 +  
 +  private class LogSyncingTask implements Runnable {
 +    
 +    @Override
 +    public void run() {
 +      ArrayList<DfsLogger.LogWork> work = new ArrayList<DfsLogger.LogWork>();
 +      while (true) {
 +        work.clear();
 +        
 +        try {
 +          work.add(workQueue.take());
 +        } catch (InterruptedException ex) {
 +          continue;
 +        }
 +        workQueue.drainTo(work);
 +        
 +        synchronized (closeLock) {
 +          if (!closed) {
 +            try {
 +              sync.invoke(logFile);
 +            } catch (Exception ex) {
 +              log.warn("Exception syncing " + ex);
 +              for (DfsLogger.LogWork logWork : work) {
 +                logWork.exception = ex;
 +              }
 +            }
 +          } else {
 +            for (DfsLogger.LogWork logWork : work) {
 +              logWork.exception = new LogClosedException();
 +            }
 +          }
 +        }
 +        
 +        boolean sawClosedMarker = false;
 +        for (DfsLogger.LogWork logWork : work)
 +          if (logWork == CLOSED_MARKER)
 +            sawClosedMarker = true;
 +          else
 +            logWork.latch.countDown();
 +        
 +        if (sawClosedMarker) {
 +          synchronized (closeLock) {
 +            closeLock.notifyAll();
 +          }
 +          break;
 +        }
 +      }
 +    }
 +  }
 +  
 +  static class LogWork {
-     List<TabletMutations> mutations;
 +    CountDownLatch latch;
 +    volatile Exception exception;
-     
-     public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
-       this.mutations = mutations;
++
++    public LogWork(CountDownLatch latch) {
 +      this.latch = latch;
 +    }
 +  }
 +  
 +  public static class LoggerOperation {
 +    private final LogWork work;
 +    
 +    public LoggerOperation(LogWork work) {
 +      this.work = work;
 +    }
 +    
 +    public void await() throws IOException {
 +      try {
 +        work.latch.await();
 +      } catch (InterruptedException e) {
 +        throw new RuntimeException(e);
 +      }
 +      
 +      if (work.exception != null) {
 +        if (work.exception instanceof IOException)
 +          throw (IOException) work.exception;
 +        else if (work.exception instanceof RuntimeException)
 +          throw (RuntimeException) work.exception;
 +        else
 +          throw new RuntimeException(work.exception);
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public boolean equals(Object obj) {
 +    // filename is unique
 +    if (obj == null)
 +      return false;
 +    if (obj instanceof DfsLogger)
 +      return getFileName().equals(((DfsLogger) obj).getFileName());
 +    return false;
 +  }
 +  
 +  @Override
 +  public int hashCode() {
 +    // filename is unique
 +    return getFileName().hashCode();
 +  }
 +  
 +  private final ServerResources conf;
 +  private FSDataOutputStream logFile;
 +  private DataOutputStream encryptingLogFile = null;
 +  private Method sync;
 +  private String logPath;
 +  
 +  public DfsLogger(ServerResources conf) throws IOException {
 +    this.conf = conf;
 +  }
 +  
 +  public DfsLogger(ServerResources conf, String filename) throws IOException {
 +    this.conf = conf;
 +    this.logPath = filename;
 +  }
 +  
 +  public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException {
 +    FSDataInputStream input = fs.open(path);
 +    DataInputStream decryptingInput = null;
 +    
 +    byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes();
 +    byte[] magicBuffer = new byte[magic.length];
 +    input.readFully(magicBuffer);
 +    if (Arrays.equals(magicBuffer, magic)) {
 +      // additional parameters it needs from the underlying stream.
 +      String cryptoModuleClassname = input.readUTF();
 +      CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname);
 +
 +      // Create the parameters and set the input stream into those parameters
 +      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 +      params.setEncryptedInputStream(input);
 +
 +      // Create the plaintext input stream from the encrypted one
 +      params = cryptoModule.getDecryptingInputStream(params);
 +
 +      if (params.getPlaintextInputStream() instanceof DataInputStream) {
 +        decryptingInput = (DataInputStream) params.getPlaintextInputStream();
 +      } else {
 +        decryptingInput = new DataInputStream(params.getPlaintextInputStream());
 +      }
 +    } else {
 +      input.seek(0);
 +      byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes();
 +      byte[] magicBufferV2 = new byte[magic.length];
 +      input.readFully(magicBufferV2);
 +
 +      if (Arrays.equals(magicBufferV2, magicV2)) {
 +        // Log files from 1.5 dump their options in raw to the logger files.  Since we don't know the class
 +        // that needs to read those files, we can make a couple of basic assumptions.  Either it's going to be
 +        // the NullCryptoModule (no crypto) or the DefaultCryptoModule.
 +        
 +        // If it's null, we won't have any parameters whatsoever.  First, let's attempt to read 
 +        // parameters
 +        Map<String,String> opts = new HashMap<String,String>();
 +        int count = input.readInt();
 +        for (int i = 0; i < count; i++) {
 +          String key = input.readUTF();
 +          String value = input.readUTF();
 +          opts.put(key, value);
 +        }
 +        
 +        if (opts.size() == 0) {
 +          // NullCryptoModule, we're done
 +          decryptingInput = input;
 +        } else {
 +          
 +          // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot.
 +          org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory
 +              .getCryptoModule(DefaultCryptoModule.class.getName());
 +
 +          CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf);
 +          
 +          input.seek(0);
 +          input.readFully(magicBuffer);
 +          params.setEncryptedInputStream(input);
 +
 +          params = cryptoModule.getDecryptingInputStream(params);
 +          if (params.getPlaintextInputStream() instanceof DataInputStream) {
 +            decryptingInput = (DataInputStream) params.getPlaintextInputStream();
 +          } else {
 +            decryptingInput = new DataInputStream(params.getPlaintextInputStream());
 +          }
 +        }
 +        
 +      } else {
 +
 +        input.seek(0);
 +        decryptingInput = input;
 +      }
 +
 +    }
 +    return new DFSLoggerInputStreams(input, decryptingInput);
 +  }
 +  
 +  public synchronized void open(String address) throws IOException {
 +    String filename = UUID.randomUUID().toString();
 +    String logger = StringUtil.join(Arrays.asList(address.split(":")), "+");
 +
 +    log.debug("DfsLogger.open() begin");
 +    VolumeManager fs = conf.getFileSystem();
 +    
 +    logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename;
 +    try {
 +      short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
 +      if (replication == 0)
 +        replication = fs.getDefaultReplication(new Path(logPath));
 +      long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
 +      if (blockSize == 0)
 +        blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
 +      if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
 +        logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize);
 +      else
 +        logFile = fs.create(new Path(logPath), true, 0, replication, blockSize);
 +      
 +      try {
 +        NoSuchMethodException e = null;
 +        try {
 +          // sync: send data to datanodes
 +          sync = logFile.getClass().getMethod("sync");
 +        } catch (NoSuchMethodException ex) {
 +          e = ex;
 +        }
 +        try {
 +          // hsync: send data to datanodes and sync the data to disk
 +          sync = logFile.getClass().getMethod("hsync");
 +          e = null;
 +        } catch (NoSuchMethodException ex) {}
 +        if (e != null)
 +          throw new RuntimeException(e);
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +      
 +      // Initialize the crypto operations.
 +      org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf
 +          .getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
 +      
 +      // Initialize the log file with a header and the crypto params used to set up this log file.
 +      logFile.write(LOG_FILE_HEADER_V3.getBytes());
 +
 +      CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration());
 +      
 +      params.setPlaintextOutputStream(new NoFlushOutputStream(logFile));
 +      
 +      // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here,
 +      // so that that crypto module can re-read its own parameters.
 +      
 +      logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS));
 +      
 +      
 +      params = cryptoModule.getEncryptingOutputStream(params);
 +      OutputStream encipheringOutputStream = params.getEncryptedOutputStream();
 +      
 +      // If the module just kicks back our original stream, then just use it, don't wrap it in
 +      // another data OutputStream.
 +      if (encipheringOutputStream == logFile) {
 +        encryptingLogFile = logFile;
 +      } else {
 +        encryptingLogFile = new DataOutputStream(encipheringOutputStream);
 +      }
 +      
 +      LogFileKey key = new LogFileKey();
 +      key.event = OPEN;
 +      key.tserverSession = filename;
 +      key.filename = filename;
 +      write(key, EMPTY);
 +      sync.invoke(logFile);
 +      log.debug("Got new write-ahead log: " + this);
 +    } catch (Exception ex) {
 +      if (logFile != null)
 +        logFile.close();
 +      logFile = null;
 +      encryptingLogFile = null;
 +      throw new IOException(ex);
 +    }
 +    
 +    Thread t = new Daemon(new LogSyncingTask());
 +    t.setName("Accumulo WALog thread " + toString());
 +    t.start();
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    String fileName = getFileName();
 +    if (fileName.contains(":"))
 +      return getLogger() + "/" + getFileName();
 +    return fileName;
 +  }
 +  
 +  public String getFileName() {
 +    return logPath.toString();
 +  }
 +  
 +  public void close() throws IOException {
 +    
 +    synchronized (closeLock) {
 +      if (closed)
 +        return;
 +      // after closed is set to true, nothing else should be added to the queue
 +      // CLOSED_MARKER should be the last thing on the queue, therefore when the
 +      // background thread sees the marker and exits there should be nothing else
 +      // to process... so nothing should be left waiting for the background
 +      // thread to do work
 +      closed = true;
 +      workQueue.add(CLOSED_MARKER);
 +      while (!workQueue.isEmpty())
 +        try {
 +          closeLock.wait();
 +        } catch (InterruptedException e) {
 +          log.info("Interrupted");
 +        }
 +    }
 +    
 +    if (encryptingLogFile != null)
 +      try {
 +        encryptingLogFile.close();
 +      } catch (IOException ex) {
 +        log.error(ex);
 +        throw new LogClosedException();
 +      }
 +  }
 +  
 +  public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException {
 +    // write this log to the METADATA table
 +    final LogFileKey key = new LogFileKey();
 +    key.event = DEFINE_TABLET;
 +    key.seq = seq;
 +    key.tid = tid;
 +    key.tablet = tablet;
 +    try {
 +      write(key, EMPTY);
 +      sync.invoke(logFile);
 +    } catch (Exception ex) {
 +      log.error(ex);
 +      throw new IOException(ex);
 +    }
 +  }
 +  
 +  /**
 +   * @param key
 +   * @param empty2
 +   * @throws IOException
 +   */
 +  private synchronized void write(LogFileKey key, LogFileValue value) throws IOException {
 +    key.write(encryptingLogFile);
 +    value.write(encryptingLogFile);
 +    encryptingLogFile.flush();
 +  }
 +  
 +  public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
 +    return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
 +  }
 +  
-   public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
-     DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
-     
++  private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys) throws IOException {
++    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
 +    synchronized (DfsLogger.this) {
 +      try {
-         for (TabletMutations tabletMutations : mutations) {
-           LogFileKey key = new LogFileKey();
-           key.event = MANY_MUTATIONS;
-           key.seq = tabletMutations.getSeq();
-           key.tid = tabletMutations.getTid();
-           LogFileValue value = new LogFileValue();
-           value.mutations = tabletMutations.getMutations();
-           write(key, value);
++        for (Pair<LogFileKey,LogFileValue> pair : keys) {
++          write(pair.getFirst(), pair.getSecond());
 +        }
 +      } catch (ClosedChannelException ex) {
 +        throw new LogClosedException();
 +      } catch (Exception e) {
 +        log.error(e, e);
 +        work.exception = e;
 +      }
 +    }
 +    
 +    synchronized (closeLock) {
 +      // use a different lock for close check so that adding to work queue does not need
 +      // to wait on walog I/O operations
 +      
 +      if (closed)
 +        throw new LogClosedException();
 +      workQueue.add(work);
 +    }
 +    
 +    return new LoggerOperation(work);
 +  }
 +  
-   public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
++  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
++    List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>();
++    for (TabletMutations tabletMutations : mutations) {
++      LogFileKey key = new LogFileKey();
++      key.event = MANY_MUTATIONS;
++      key.seq = tabletMutations.getSeq();
++      key.tid = tabletMutations.getTid();
++      LogFileValue value = new LogFileValue();
++      value.mutations = tabletMutations.getMutations();
++      data.add(new Pair<LogFileKey, LogFileValue>(key, value));
++    }
++    return logFileData(data);
++  }
++
++  public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
 +    LogFileKey key = new LogFileKey();
 +    key.event = COMPACTION_FINISH;
 +    key.seq = seq;
 +    key.tid = tid;
-     try {
-       write(key, EMPTY);
-     } catch (IOException ex) {
-       log.error(ex);
-       throw ex;
-     }
++    return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
 +  }
 +  
-   public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
++  public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
 +    LogFileKey key = new LogFileKey();
 +    key.event = COMPACTION_START;
 +    key.seq = seq;
 +    key.tid = tid;
 +    key.filename = fqfn;
-     try {
-       write(key, EMPTY);
-     } catch (IOException ex) {
-       log.error(ex);
-       throw ex;
-     }
++    return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
 +  }
 +
 +  public String getLogger() {
 +    String parts[] = logPath.split("/");
 +    return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":");
 +  }
 +  
 +}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/7c309097/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index a276a97,0000000..fb90757
mode 100644,000000..100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@@ -1,430 -1,0 +1,430 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.tserver.log;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.ReadWriteLock;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.KeyExtent;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.tserver.Tablet;
 +import org.apache.accumulo.tserver.Tablet.CommitSession;
 +import org.apache.accumulo.tserver.TabletMutations;
 +import org.apache.accumulo.tserver.TabletServer;
 +import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.log4j.Logger;
 +
 +/**
 + * Central logging facility for the TServerInfo.
 + * 
 + * Forwards in-memory updates to remote logs, carefully writing the same data to every log, while maintaining the maximum thread parallelism for greater
 + * performance. As new logs are used and minor compactions are performed, the metadata table is kept up-to-date.
 + * 
 + */
 +public class TabletServerLogger {
 +  
 +  private static final Logger log = Logger.getLogger(TabletServerLogger.class);
 +  
 +  private final AtomicLong logSizeEstimate = new AtomicLong();
 +  private final long maxSize;
 +  
 +  private final TabletServer tserver;
 +  
 +  // The current log set: always updated to a new set with every change of loggers
 +  private final List<DfsLogger> loggers = new ArrayList<DfsLogger>();
 +  
 +  // The current generation of logSet.
 +  // Because multiple threads can be using a log set at one time, a log
 +  // failure is likely to affect multiple threads, who will all attempt to
 +  // create a new logSet. This will cause many unnecessary updates to the
 +  // metadata table.
 +  // We'll use this generational counter to determine if another thread has
 +  // already fetched a new logSet.
 +  private AtomicInteger logSetId = new AtomicInteger();
 +  
 +  // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them
 +  private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock();
 +  
 +  private final AtomicInteger seqGen = new AtomicInteger();
 +  
 +  private static boolean enabled(Tablet tablet) {
 +    return tablet.getTableConfiguration().getBoolean(Property.TABLE_WALOG_ENABLED);
 +  }
 +  
 +  private static boolean enabled(CommitSession commitSession) {
 +    return enabled(commitSession.getTablet());
 +  }
 +  
 +  static private abstract class TestCallWithWriteLock {
 +    abstract boolean test();
 +    
 +    abstract void withWriteLock() throws IOException;
 +  }
 +  
 +  /**
 +   * Pattern taken from the documentation for ReentrantReadWriteLock
 +   * 
 +   * @param rwlock
 +   *          lock to use
 +   * @param code
 +   *          a test/work pair
 +   * @throws IOException
 +   */
 +  private static void testLockAndRun(final ReadWriteLock rwlock, TestCallWithWriteLock code) throws IOException {
 +    // Get a read lock
 +    rwlock.readLock().lock();
 +    try {
 +      // does some condition exist that needs the write lock?
 +      if (code.test()) {
 +        // Yes, let go of the readlock
 +        rwlock.readLock().unlock();
 +        // Grab the write lock
 +        rwlock.writeLock().lock();
 +        try {
 +          // double-check the condition, since we let go of the lock
 +          if (code.test()) {
 +            // perform the work with with write lock held
 +            code.withWriteLock();
 +          }
 +        } finally {
 +          // regain the readlock
 +          rwlock.readLock().lock();
 +          // unlock the write lock
 +          rwlock.writeLock().unlock();
 +        }
 +      }
 +    } finally {
 +      // always let go of the lock
 +      rwlock.readLock().unlock();
 +    }
 +  }
 +  
 +  public TabletServerLogger(TabletServer tserver, long maxSize) {
 +    this.tserver = tserver;
 +    this.maxSize = maxSize;
 +  }
 +  
 +  private int initializeLoggers(final List<DfsLogger> copy) throws IOException {
 +    final int[] result = {-1};
 +    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
 +      boolean test() {
 +        copy.clear();
 +        copy.addAll(loggers);
 +        if (!loggers.isEmpty())
 +          result[0] = logSetId.get();
 +        return loggers.isEmpty();
 +      }
 +      
 +      void withWriteLock() throws IOException {
 +        try {
 +          createLoggers();
 +          copy.clear();
 +          copy.addAll(loggers);
 +          if (copy.size() > 0)
 +            result[0] = logSetId.get();
 +          else
 +            result[0] = -1;
 +        } catch (IOException e) {
 +          log.error("Unable to create loggers", e);
 +        }
 +      }
 +    });
 +    return result[0];
 +  }
 +  
 +  public void getLogFiles(Set<String> loggersOut) {
 +    logSetLock.readLock().lock();
 +    try {
 +      for (DfsLogger logger : loggers) {
 +        loggersOut.add(logger.getFileName());
 +      }
 +    } finally {
 +      logSetLock.readLock().unlock();
 +    }
 +  }
 +  
 +  synchronized private void createLoggers() throws IOException {
 +    if (!logSetLock.isWriteLockedByCurrentThread()) {
 +      throw new IllegalStateException("createLoggers should be called with write lock held!");
 +    }
 +    
 +    if (loggers.size() != 0) {
 +      throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size());
 +    }
 +    
 +    try {
 +      DfsLogger alog = new DfsLogger(tserver.getServerConfig());
 +      alog.open(tserver.getClientAddressString());
 +      loggers.add(alog);
 +      logSetId.incrementAndGet();
 +      return;
 +    } catch (Exception t) {
 +      throw new RuntimeException(t);
 +    }
 +  }
 +  
 +  public void resetLoggers() throws IOException {
 +    logSetLock.writeLock().lock();
 +    try {
 +      close();
 +    } finally {
 +      logSetLock.writeLock().unlock();
 +    }
 +  }
 +  
 +  synchronized private void close() throws IOException {
 +    if (!logSetLock.isWriteLockedByCurrentThread()) {
 +      throw new IllegalStateException("close should be called with write lock held!");
 +    }
 +    try {
 +      for (DfsLogger logger : loggers) {
 +        try {
 +          logger.close();
 +        } catch (DfsLogger.LogClosedException ex) {
 +          // ignore
 +        } catch (Throwable ex) {
 +          log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex);
 +        }
 +      }
 +      loggers.clear();
 +      logSizeEstimate.set(0);
 +    } catch (Throwable t) {
 +      throw new IOException(t);
 +    }
 +  }
 +  
 +  interface Writer {
 +    LoggerOperation write(DfsLogger logger, int seq) throws Exception;
 +  }
 +  
 +  private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException {
 +    List<CommitSession> sessions = Collections.singletonList(commitSession);
 +    return write(sessions, mincFinish, writer);
 +  }
 +  
 +  private int write(Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException {
 +    // Work very hard not to lock this during calls to the outside world
 +    int currentLogSet = logSetId.get();
 +    
 +    int seq = -1;
 +    
 +    int attempt = 0;
 +    boolean success = false;
 +    while (!success) {
 +      try {
 +        // get a reference to the loggers that no other thread can touch
 +        ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>();
 +        currentLogSet = initializeLoggers(copy);
 +        
 +        // add the logger to the log set for the memory in the tablet,
 +        // update the metadata table if we've never used this tablet
 +        
 +        if (currentLogSet == logSetId.get()) {
 +          for (CommitSession commitSession : sessions) {
 +            if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) {
 +              try {
 +                // Scribble out a tablet definition and then write to the metadata table
 +                defineTablet(commitSession);
 +                if (currentLogSet == logSetId.get())
 +                  tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId());
 +              } finally {
 +                commitSession.finishUpdatingLogsUsed();
 +              }
 +            }
 +          }
 +        }
 +        
 +        // Make sure that the logs haven't changed out from underneath our copy
 +        if (currentLogSet == logSetId.get()) {
 +          
 +          // write the mutation to the logs
 +          seq = seqGen.incrementAndGet();
 +          if (seq < 0)
 +            throw new RuntimeException("Logger sequence generator wrapped!  Onos!!!11!eleven");
 +          ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size());
 +          for (DfsLogger wal : copy) {
 +            LoggerOperation lop = writer.write(wal, seq);
 +            if (lop != null)
 +              queuedOperations.add(lop);
 +          }
 +          
 +          for (LoggerOperation lop : queuedOperations) {
 +            lop.await();
 +          }
 +          
 +          // double-check: did the log set change?
 +          success = (currentLogSet == logSetId.get());
 +        }
 +      } catch (DfsLogger.LogClosedException ex) {
 +        log.debug("Logs closed while writing, retrying " + (attempt + 1));
 +      } catch (Exception t) {
 +        log.error("Unexpected error writing to log, retrying attempt " + (attempt + 1), t);
 +        UtilWaitThread.sleep(100);
 +      } finally {
 +        attempt++;
 +      }
 +      // Some sort of write failure occurred. Grab the write lock and reset the logs.
 +      // But since multiple threads will attempt it, only attempt the reset when
 +      // the logs haven't changed.
 +      final int finalCurrent = currentLogSet;
 +      if (!success) {
 +        testLockAndRun(logSetLock, new TestCallWithWriteLock() {
 +          
 +          @Override
 +          boolean test() {
 +            return finalCurrent == logSetId.get();
 +          }
 +          
 +          @Override
 +          void withWriteLock() throws IOException {
 +            close();
 +          }
 +        });
 +      }
 +    }
 +    // if the log gets too big, reset it .. grab the write lock first
 +    logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead
 +    testLockAndRun(logSetLock, new TestCallWithWriteLock() {
 +      boolean test() {
 +        return logSizeEstimate.get() > maxSize;
 +      }
 +      
 +      void withWriteLock() throws IOException {
 +        close();
 +      }
 +    });
 +    return seq;
 +  }
 +  
 +  public int defineTablet(final CommitSession commitSession) throws IOException {
 +    // scribble this into the metadata tablet, too.
 +    if (!enabled(commitSession))
 +      return -1;
 +    return write(commitSession, false, new Writer() {
 +      @Override
 +      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
 +        logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent());
 +        return null;
 +      }
 +    });
 +  }
 +  
 +  public int log(final CommitSession commitSession, final int tabletSeq, final Mutation m) throws IOException {
 +    if (!enabled(commitSession))
 +      return -1;
 +    int seq = write(commitSession, false, new Writer() {
 +      @Override
 +      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
 +        return logger.log(tabletSeq, commitSession.getLogId(), m);
 +      }
 +    });
 +    logSizeEstimate.addAndGet(m.numBytes());
 +    return seq;
 +  }
 +  
 +  public int logManyTablets(Map<CommitSession,List<Mutation>> mutations) throws IOException {
 +    
 +    final Map<CommitSession,List<Mutation>> loggables = new HashMap<CommitSession,List<Mutation>>(mutations);
 +    for (CommitSession t : mutations.keySet()) {
 +      if (!enabled(t))
 +        loggables.remove(t);
 +    }
 +    if (loggables.size() == 0)
 +      return -1;
 +    
 +    int seq = write(loggables.keySet(), false, new Writer() {
 +      @Override
 +      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
 +        List<TabletMutations> copy = new ArrayList<TabletMutations>(loggables.size());
 +        for (Entry<CommitSession,List<Mutation>> entry : loggables.entrySet()) {
 +          CommitSession cs = entry.getKey();
 +          copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), entry.getValue()));
 +        }
 +        return logger.logManyTablets(copy);
 +      }
 +    });
 +    for (List<Mutation> entry : loggables.values()) {
 +      if (entry.size() < 1)
 +        throw new IllegalArgumentException("logManyTablets: logging empty mutation list");
 +      for (Mutation m : entry) {
 +        logSizeEstimate.addAndGet(m.numBytes());
 +      }
 +    }
 +    return seq;
 +  }
 +  
 +  public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final int walogSeq) throws IOException {
 +    
 +    if (!enabled(commitSession))
 +      return;
 +    
 +    long t1 = System.currentTimeMillis();
 +    
 +    int seq = write(commitSession, true, new Writer() {
 +      @Override
 +      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-         logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
++        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName).await();
 +        return null;
 +      }
 +    });
 +    
 +    long t2 = System.currentTimeMillis();
 +    
 +    log.debug(" wrote MinC finish  " + seq + ": writeTime:" + (t2 - t1) + "ms ");
 +  }
 +  
 +  public int minorCompactionStarted(final CommitSession commitSession, final int seq, final String fullyQualifiedFileName) throws IOException {
 +    if (!enabled(commitSession))
 +      return -1;
 +    write(commitSession, false, new Writer() {
 +      @Override
 +      public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-         logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
++        logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName).await();
 +        return null;
 +      }
 +    });
 +    return seq;
 +  }
 +  
 +  public void recover(VolumeManager fs, Tablet tablet, List<Path> logs, Set<String> tabletFiles, MutationReceiver mr) throws IOException {
 +    if (!enabled(tablet))
 +      return;
 +    try {
 +      SortedLogRecovery recovery = new SortedLogRecovery(fs);
 +      KeyExtent extent = tablet.getExtent();
 +      recovery.recover(extent, logs, tabletFiles, mr);
 +    } catch (Exception e) {
 +      throw new IOException(e);
 +    }
 +  }
 +  
 +}


[4/5] git commit: ACCUMULO-2172 wait for minor compaction flags to be flushed to the WAL

Posted by ec...@apache.org.
ACCUMULO-2172 wait for minor compaction flags to be flushed to the WAL


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3b41d37e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3b41d37e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3b41d37e

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 3b41d37ed27b5dc8724b427ed6970b083464bd94
Parents: 4822b13
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 10 16:27:01 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 10 16:27:01 2014 -0500

----------------------------------------------------------------------
 .../server/tabletserver/log/DfsLogger.java      | 58 +++++++++-----------
 .../tabletserver/log/TabletServerLogger.java    |  4 +-
 2 files changed, 29 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b41d37e/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
index 213c885..ded2962 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.Daemon;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.StringUtil;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
@@ -89,7 +90,7 @@ public class DfsLogger {
 
   private final Object closeLock = new Object();
 
-  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null, null);
+  private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null);
 
   private static final LogFileValue EMPTY = new LogFileValue();
 
@@ -145,12 +146,10 @@ public class DfsLogger {
   }
 
   static class LogWork {
-    List<TabletMutations> mutations;
     CountDownLatch latch;
     volatile Exception exception;
 
-    public LogWork(List<TabletMutations> mutations, CountDownLatch latch) {
-      this.mutations = mutations;
+    public LogWork(CountDownLatch latch) {
       this.latch = latch;
     }
   }
@@ -439,27 +438,20 @@ public class DfsLogger {
   public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException {
     return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation))));
   }
-
-  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
-    DfsLogger.LogWork work = new DfsLogger.LogWork(mutations, new CountDownLatch(1));
-
+  
+  private LoggerOperation logFileData(List<Pair<LogFileKey, LogFileValue>> keys) throws IOException {
+    DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1));
     synchronized (DfsLogger.this) {
       try {
-        for (TabletMutations tabletMutations : mutations) {
-          LogFileKey key = new LogFileKey();
-          key.event = MANY_MUTATIONS;
-          key.seq = tabletMutations.getSeq();
-          key.tid = tabletMutations.getTid();
-          LogFileValue value = new LogFileValue();
-          value.mutations = tabletMutations.getMutations();
-          write(key, value);
+        for (Pair<LogFileKey,LogFileValue> pair : keys) {
+          write(pair.getFirst(), pair.getSecond());
         }
       } catch (Exception e) {
         log.error(e, e);
         work.exception = e;
       }
     }
-
+    
     synchronized (closeLock) {
       // use a different lock for close check so that adding to work queue does not need
       // to wait on walog I/O operations
@@ -472,31 +464,35 @@ public class DfsLogger {
     return new LoggerOperation(work);
   }
 
-  public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
+  public LoggerOperation logManyTablets(List<TabletMutations> mutations) throws IOException {
+    List<Pair<LogFileKey, LogFileValue>> data = new ArrayList<Pair<LogFileKey, LogFileValue>>();
+    for (TabletMutations tabletMutations : mutations) {
+      LogFileKey key = new LogFileKey();
+      key.event = MANY_MUTATIONS;
+      key.seq = tabletMutations.getSeq();
+      key.tid = tabletMutations.getTid();
+      LogFileValue value = new LogFileValue();
+      value.mutations = tabletMutations.getMutations();
+      data.add(new Pair<LogFileKey, LogFileValue>(key, value));
+    }
+    return logFileData(data);
+  }
+
+  public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException {
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_FINISH;
     key.seq = seq;
     key.tid = tid;
-    try {
-      write(key, EMPTY);
-    } catch (IOException ex) {
-      log.error(ex);
-      throw ex;
-    }
+    return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
   }
 
-  public synchronized void minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
+  public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException {
     LogFileKey key = new LogFileKey();
     key.event = COMPACTION_START;
     key.seq = seq;
     key.tid = tid;
     key.filename = fqfn;
-    try {
-      write(key, EMPTY);
-    } catch (IOException ex) {
-      log.error(ex);
-      throw ex;
-    }
+    return logFileData(Collections.singletonList(new Pair<LogFileKey, LogFileValue>(key, EMPTY)));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b41d37e/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
index dd9d3fa..6ca1ad0 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
@@ -390,7 +390,7 @@ public class TabletServerLogger {
     int seq = write(commitSession, true, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName);
+        logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName).await();
         return null;
       }
     });
@@ -406,7 +406,7 @@ public class TabletServerLogger {
     write(commitSession, false, new Writer() {
       @Override
       public LoggerOperation write(DfsLogger logger, int ignored) throws Exception {
-        logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName);
+        logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName).await();
         return null;
       }
     });


[3/5] git commit: Merge branch '1.5.1-SNAPSHOT' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.5.1-SNAPSHOT

Posted by ec...@apache.org.
Merge branch '1.5.1-SNAPSHOT' of https://git-wip-us.apache.org/repos/asf/accumulo into 1.5.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4822b131
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4822b131
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4822b131

Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 4822b1318fe20cb76067991ee70675172f82bd28
Parents: ee9035f d0f626d
Author: Eric Newton <er...@gmail.com>
Authored: Fri Jan 10 10:14:49 2014 -0500
Committer: Eric Newton <er...@gmail.com>
Committed: Fri Jan 10 10:14:49 2014 -0500

----------------------------------------------------------------------
 .../minicluster/MiniAccumuloCluster.java        |  7 +++--
 .../minicluster/MiniAccumuloConfig.java         | 17 ------------
 .../minicluster/MiniAccumuloClusterGCTest.java  | 27 --------------------
 3 files changed, 3 insertions(+), 48 deletions(-)
----------------------------------------------------------------------