You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/05/24 21:59:11 UTC

svn commit: r1486202 [1/2] - in /accumulo/branches/ACCUMULO-118: core/src/main/java/org/apache/accumulo/core/conf/ server/src/main/java/org/apache/accumulo/server/ server/src/main/java/org/apache/accumulo/server/fs/ server/src/main/java/org/apache/accu...

Author: ecn
Date: Fri May 24 19:59:11 2013
New Revision: 1486202

URL: http://svn.apache.org/r1486202
Log:
ACCUMULO-118 route the servers to new FileSystem interface; unit tests and functional tests still run

Added:
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java   (with props)
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java   (with props)
Modified:
    accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
    accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
    accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
    accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
    accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
    accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java

Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java Fri May 24 19:59:11 2013
@@ -70,6 +70,8 @@ public enum Property {
       "A secret unique to a given instance that all servers must know in order to communicate with one another."
           + " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], "
           + " and then update conf/accumulo-site.xml everywhere."),
+  INSTANCE_NAMESPACES("instance.namespaces", "", PropertyType.STRING,
+      "A list of namespaces to use."),
   INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME,
       "The authenticator class that accumulo will use to determine if a user has privilege to perform an action"),
   INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME,

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java Fri May 24 19:59:11 2013
@@ -20,7 +20,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map.Entry;
@@ -33,12 +32,11 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.accumulo.core.util.Version;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.log4j.Logger;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.xml.DOMConfigurator;
@@ -52,7 +50,7 @@ public class Accumulo {
     try {
       if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
         fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION));
-        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION), false);
+        fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION));
       }
     } catch (IOException e) {
       throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
@@ -62,7 +60,7 @@ public class Accumulo {
   public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
     int dataVersion;
     try {
-      FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
+      FileStatus[] files = fs.getDefaultNamespace().listStatus(ServerConstants.getDataVersionLocation());
       if (files == null || files.length == 0) {
         dataVersion = -1; // assume it is 0.5 or earlier
       } else {
@@ -198,7 +196,7 @@ public class Accumulo {
     long sleep = 1000;
     while (true) {
       try {
-        if (!isInSafeMode(fs))
+        if (fs.isReady())
           break;
         log.warn("Waiting for the NameNode to leave safemode");
       } catch (IOException ex) {
@@ -211,37 +209,4 @@ public class Accumulo {
     log.info("Connected to HDFS");
   }
   
-  private static boolean isInSafeMode(FileSystem fs) throws IOException {
-    if (!(fs instanceof DistributedFileSystem))
-      return false;
-    DistributedFileSystem dfs = (DistributedFileSystem)fs;
-    // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
-    // Becomes this:
-    Class<?> safeModeAction;
-    try {
-      // hadoop 2.0
-      safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
-    } catch (ClassNotFoundException ex) {
-      // hadoop 1.0
-      try {
-        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
-      } catch (ClassNotFoundException e) {
-        throw new RuntimeException("Cannot figure out the right class for Constants");
-      }
-    }
-    Object get = null;
-    for (Object obj : safeModeAction.getEnumConstants()) {
-      if (obj.toString().equals("SAFEMODE_GET"))
-        get = obj;
-    }
-    if (get == null) {
-      throw new RuntimeException("cannot find SAFEMODE_GET");
-    }
-    try {
-      Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
-      return (Boolean) setSafeMode.invoke(dfs, get);
-    } catch (Exception ex) {
-      throw new RuntimeException("cannot find method setSafeMode");
-    }
-  }
 }

Added: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java?rev=1486202&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java (added)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java Fri May 24 19:59:11 2013
@@ -0,0 +1,64 @@
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public interface FileSystem {
+  
+  void close() throws IOException;
+  
+  boolean closePossiblyOpenFile(Path path) throws IOException;
+  
+  FSDataOutputStream create(Path dest) throws IOException;
+  
+  FSDataOutputStream create(Path path, boolean b) throws IOException;
+  
+  FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1) throws IOException;
+  
+  boolean createNewFile(Path writable) throws IOException;
+  
+  FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize) throws IOException;
+  
+  boolean delete(Path path) throws IOException;
+  
+  boolean deleteRecursively(Path path) throws IOException;
+  
+  boolean exists(Path newBulkDir) throws IOException;
+  
+  FileStatus getFileStatus(Path errorPath) throws IOException;
+  
+  org.apache.hadoop.fs.FileSystem getFileSystemByPath(Path path);
+  
+  org.apache.hadoop.fs.FileSystem getFileSystemByPath(String path);
+  
+  Collection<org.apache.hadoop.fs.FileSystem> getFileSystems();
+  
+  FileStatus[] listStatus(Path path) throws IOException;
+  
+  boolean mkdirs(Path directory) throws IOException;
+  
+  FSDataInputStream open(Path path) throws IOException;
+  
+  boolean rename(Path path, Path newPath) throws IOException;
+  
+  boolean moveToTrash(Path sourcePath) throws IOException;
+  
+  short getDefaultReplication(Path logPath);
+  
+  boolean isFile(Path path) throws IOException;
+  
+  boolean isReady() throws IOException;
+  
+  org.apache.hadoop.fs.FileSystem getDefaultNamespace();
+  
+  FileStatus[] globStatus(Path path) throws IOException;
+
+  String getFullPath(Key key);
+  
+}

Propchange: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java?rev=1486202&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java (added)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java Fri May 24 19:59:11 2013
@@ -0,0 +1,360 @@
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+// hide the need to make blocksize be an even number of checksum blocks 
+//
+//      int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
+//      blockSize -= blockSize % checkSum;
+//      blockSize = Math.max(blockSize, checkSum);
+// if buffersize is zero, get it from the filesystem
+// 
+
+public class FileSystemImpl implements org.apache.accumulo.server.fs.FileSystem {
+  
+  private static final Logger log = Logger.getLogger(FileSystem.class);
+  
+  Map<String, ? extends FileSystem> namespaces;
+  String defaultNamespace;
+  AccumuloConfiguration conf;
+  
+  protected FileSystemImpl(Map<String, ? extends FileSystem> namespaces, String defaultNamespace, AccumuloConfiguration conf) {
+    this.namespaces = namespaces;
+    this.defaultNamespace = defaultNamespace;
+    this.conf = conf;
+    ensureSyncIsEnabled();
+  }
+  
+  public static org.apache.accumulo.server.fs.FileSystem getLocal() throws IOException {
+    return new FileSystemImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "", DefaultConfiguration.getDefaultConfiguration());
+  }
+
+  @Override
+  public void close() throws IOException {
+    IOException ex = null;
+    for (FileSystem fs : namespaces.values()) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        ex = e;
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+  
+  @Override
+  public boolean closePossiblyOpenFile(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    if (fs instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      try {
+        return dfs.recoverLease(path);
+      } catch (FileNotFoundException ex) {
+        throw ex;
+      } 
+    } else if (fs instanceof LocalFileSystem) {
+      // ignore
+    } else {
+      throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+    }
+    fs.append(path).close();
+    log.info("Recovered lease on " + path.toString() + " using append");
+    return true;
+  }
+  
+  @Override
+  public FSDataOutputStream create(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.create(path);
+  }
+  
+  @Override
+  public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.create(path, overwrite);
+  }
+  
+  private static long correctBlockSize(Configuration conf, long blockSize) {
+    if (blockSize <= 0)
+      blockSize = conf.getLong("dfs.block.size", 67108864);
+    
+    int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+    blockSize -= blockSize % checkSum;
+    blockSize = Math.max(blockSize, checkSum);
+    return blockSize;
+  }
+
+  private static int correctBufferSize(Configuration conf, int bufferSize) {
+    if (bufferSize <= 0)
+      bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    return bufferSize;
+  }
+  
+
+  @Override
+  public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    if (bufferSize == 0) {
+      fs.getConf().getInt("io.file.buffer.size", 4096);
+    }
+    return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+  }
+
+  @Override
+  public boolean createNewFile(Path path) throws IOException {
+    FileSystem fs = getFileSystemByPath(path);
+    return fs.createNewFile(path);
+  }
+  @Override
+  public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+    FileSystem fs = getFileSystemByPath(logPath);
+    blockSize = correctBlockSize(fs.getConf(), blockSize);
+    bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+    try {
+      // This... 
+      //    EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+      //    return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+      // Becomes this:
+      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+      List<Enum<?>> flags = new ArrayList<Enum<?>>();
+      if (createFlags.isEnum()) {
+        for (Object constant : createFlags.getEnumConstants()) {
+          if (constant.toString().equals("SYNC_BLOCK")) {
+            flags.add((Enum<?>)constant);
+            log.debug("Found synch enum " + constant);
+          }
+          if (constant.toString().equals("CREATE")) {
+            flags.add((Enum<?>)constant);
+            log.debug("Found CREATE enum " + constant);
+          }
+        }
+      }
+      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+      log.debug("CreateFlag set: " + set);
+      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+      return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+    } catch (ClassNotFoundException ex) {
+      // Expected in hadoop 1.0
+      return fs.create(logPath, true, bufferSize, replication, blockSize);
+    } catch (Exception ex) {
+      log.debug(ex, ex);
+      return fs.create(logPath, true, bufferSize, replication, blockSize);
+    }
+  }
+
+  @Override
+  public boolean delete(Path path)  throws IOException{
+    return getFileSystemByPath(path).delete(path, false);
+  }
+
+  @Override
+  public boolean deleteRecursively(Path path) throws IOException {
+    return getFileSystemByPath(path).delete(path, true);
+  }
+
+  private void ensureSyncIsEnabled() {
+    for (FileSystem fs : getFileSystems()) {
+      if (fs instanceof DistributedFileSystem) {
+        if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
+          String msg = "Must set dfs.durable.sync OR dfs.support.append to true.  Which one needs to be set depends on your version of HDFS.  See ACCUMULO-623. \n"
+              + "HADOOP RELEASE          VERSION           SYNC NAME             DEFAULT\n"
+              + "Apache Hadoop           0.20.205          dfs.support.append    false\n"
+              + "Apache Hadoop            0.23.x           dfs.support.append    true\n"
+              + "Apache Hadoop             1.0.x           dfs.support.append    false\n"
+              + "Apache Hadoop             1.1.x           dfs.durable.sync      true\n"
+              + "Apache Hadoop          2.0.0-2.0.2        dfs.support.append    true\n"
+              + "Cloudera CDH             3u0-3u3             ????               true\n"
+              + "Cloudera CDH               3u4            dfs.support.append    true\n"
+              + "Hortonworks HDP           `1.0            dfs.support.append    false\n"
+              + "Hortonworks HDP           `1.1            dfs.support.append    false";
+          log.fatal(msg);
+          System.exit(-1);
+        }
+        try {
+          // if this class exists
+          Class.forName("org.apache.hadoop.fs.CreateFlag");
+          // we're running hadoop 2.0, 1.1
+          if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+            log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
+          }
+        } catch (ClassNotFoundException ex) {
+          // hadoop 1.0
+        }
+      }
+    }
+    
+  }
+
+  @Override
+  public boolean exists(Path path) throws IOException {
+    return getFileSystemByPath(path).exists(path);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    return getFileSystemByPath(path).getFileStatus(path);
+  }
+
+  @Override
+  public FileSystem getFileSystemByPath(Path path) {
+    log.info("Looking up namespace on " + path);
+    return namespaces.get(defaultNamespace);
+  }
+
+  @Override
+  public FileSystem getFileSystemByPath(String path) {
+    return getFileSystemByPath(new Path(path));
+  }
+
+  @Override
+  public Collection<FileSystem> getFileSystems() {
+    return new ArrayList<FileSystem>(namespaces.values());
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path path) throws IOException {
+    return getFileSystemByPath(path).listStatus(path); 
+  }
+
+  @Override
+  public boolean mkdirs(Path path) throws IOException {
+    return getFileSystemByPath(path).mkdirs(path);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path) throws IOException {
+    return getFileSystemByPath(path).open(path);
+  }
+
+  @Override
+  public boolean rename(Path path, Path newPath)throws IOException {
+    FileSystem source = getFileSystemByPath(path);
+    FileSystem dest = getFileSystemByPath(newPath);
+    if (source != dest) {
+      throw new NotImplementedException("Cannot rename files across namespaces: " + path + " -> " + newPath);
+    }
+    return source.rename(path, newPath);
+  }
+
+  @Override
+  public boolean moveToTrash(Path path) throws IOException{
+    FileSystem fs = getFileSystemByPath(path);
+    Trash trash = new Trash(fs, fs.getConf());
+    return trash.moveToTrash(path);
+  }
+
+  @Override
+  public short getDefaultReplication(Path path) {
+    return getFileSystemByPath(path).getDefaultReplication();
+  }
+
+  @Override
+  public boolean isFile(Path path) throws IOException {
+    return getFileSystemByPath(path).isFile(path);
+  }
+
+  public static org.apache.accumulo.server.fs.FileSystem get() throws IOException {
+    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+    return new FileSystemImpl(Collections.singletonMap("", FileSystem.get(CachedConfiguration.getInstance())), "", conf);
+  }
+
+  @Override
+  public boolean isReady() throws IOException {
+    for (FileSystem fs : getFileSystems()) {
+      if (!(fs instanceof DistributedFileSystem))
+        continue;
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+      // Becomes this:
+      Class<?> safeModeAction;
+      try {
+        // hadoop 2.0
+        safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+      } catch (ClassNotFoundException ex) {
+        // hadoop 1.0
+        try {
+          safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException("Cannot figure out the right class for Constants");
+        }
+      }
+      Object get = null;
+      for (Object obj : safeModeAction.getEnumConstants()) {
+        if (obj.toString().equals("SAFEMODE_GET"))
+          get = obj;
+      }
+      if (get == null) {
+        throw new RuntimeException("cannot find SAFEMODE_GET");
+      }
+      try {
+        Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+        boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+        if (inSafeMode) {
+          return false;
+        }
+      } catch (Exception ex) {
+        throw new RuntimeException("cannot find method setSafeMode");
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public FileSystem getDefaultNamespace() {
+    return namespaces.get(defaultNamespace);
+  }
+
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+  }
+
+  @Override
+  public String getFullPath(Key key) {
+    
+    String relPath = key.getColumnQualifierData().toString();
+    byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+    
+    if (relPath.startsWith("../"))
+      relPath = relPath.substring(2);
+    else
+      relPath = "/" + new String(tableId) + relPath;
+    String fullPath = Constants.getTablesDir(conf) + relPath;
+    FileSystem ns = getFileSystemByPath(fullPath);
+    return ns.makeQualified(new Path(fullPath)).toString();
+  }
+
+}

Propchange: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Fri May 24 19:59:11 2013
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.util.ThriftUtil;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.MetadataTable;
@@ -47,9 +48,7 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
@@ -60,13 +59,11 @@ public class GarbageCollectWriteAheadLog
   private final Instance instance;
   private final FileSystem fs;
   
-  private Trash trash;
+  private boolean useTrash;
   
-  GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs, boolean noTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs, boolean useTrash) throws IOException {
     this.instance = instance;
     this.fs = fs;
-    if (!noTrash)
-      this.trash = new Trash(fs, fs.getConf());
   }
   
   public void collect(GCStatus status) {
@@ -139,8 +136,8 @@ public class GarbageCollectWriteAheadLog
           log.debug("Removing old-style WAL " + entry.getValue());
           try {
             Path path = new Path(Constants.getWalDirectory(conf), filename);
-            if (trash == null || !trash.moveToTrash(path))
-              fs.delete(path, true);
+            if (!useTrash || !fs.moveToTrash(path))
+              fs.deleteRecursively(path);
             status.currentLog.deleted++;
           } catch (FileNotFoundException ex) {
             // ignored
@@ -156,8 +153,8 @@ public class GarbageCollectWriteAheadLog
             log.debug("Removing WAL for offline server " + filename);
             try {
               Path path = new Path(serverPath, filename);
-              if (trash == null || !trash.moveToTrash(path))
-                fs.delete(path, true);
+              if (!useTrash || !fs.moveToTrash(path))
+                fs.deleteRecursively(path);
               status.currentLog.deleted++;
             } catch (FileNotFoundException ex) {
               // ignored
@@ -189,8 +186,8 @@ public class GarbageCollectWriteAheadLog
       log.debug("Removing sorted WAL " + sortedWALog);
       Path swalog = new Path(recoveryDir, sortedWALog);
       try {
-        if (trash == null || !trash.moveToTrash(swalog)) {
-          fs.delete(swalog, true);
+        if (!useTrash || !fs.moveToTrash(swalog)) {
+          fs.deleteRecursively(swalog);
         }
       } catch (FileNotFoundException ex) {
         // ignored

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Fri May 24 19:59:11 2013
@@ -56,7 +56,6 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -65,7 +64,6 @@ import org.apache.accumulo.core.master.s
 import org.apache.accumulo.core.security.CredentialHelper;
 import org.apache.accumulo.core.security.SecurityUtil;
 import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.ServerServices;
 import org.apache.accumulo.core.util.ServerServices.Service;
@@ -77,11 +75,11 @@ import org.apache.accumulo.server.Accumu
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.master.state.tables.TableManager;
 import org.apache.accumulo.server.security.SecurityConstants;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.OfflineMetadataScanner;
 import org.apache.accumulo.server.util.TServerUtils;
 import org.apache.accumulo.server.util.TabletIterator;
 import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -92,9 +90,7 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -126,7 +122,7 @@ public class SimpleGarbageCollector impl
   private long gcStartDelay;
   private boolean checkForBulkProcessingFiles;
   private FileSystem fs;
-  private Trash trash = null;
+  private boolean useTrash = true;
   private boolean safemode = false, offline = false, verbose = false;
   private String address = "localhost";
   private ZooLock lock;
@@ -143,7 +139,7 @@ public class SimpleGarbageCollector impl
     
     Instance instance = HdfsZooInstance.getInstance();
     ServerConfiguration serverConf = new ServerConfiguration(instance);
-    final FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), serverConf.getConfiguration());
+    final FileSystem fs = FileSystemImpl.get();
     Accumulo.init(fs, serverConf, "gc");
     String address = "localhost";
     SimpleGarbageCollector gc = new SimpleGarbageCollector();
@@ -183,7 +179,7 @@ public class SimpleGarbageCollector impl
   }
 
   public void init(FileSystem fs, Instance instance, TCredentials credentials, boolean noTrash) throws IOException {
-    this.fs = TraceFileSystem.wrap(fs);
+    this.fs = fs;
     this.credentials = credentials;
     this.instance = instance;
     
@@ -197,9 +193,7 @@ public class SimpleGarbageCollector impl
     log.info("verbose: " + verbose);
     log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes");
     log.info("delete threads: " + numDeleteThreads);
-    if (!noTrash) {
-      this.trash = new Trash(fs, fs.getConf());
-    }
+    useTrash = !noTrash;
   }
   
   private void run() {
@@ -299,7 +293,7 @@ public class SimpleGarbageCollector impl
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, trash == null);
+        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, useTrash);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {
@@ -329,10 +323,10 @@ public class SimpleGarbageCollector impl
   }
   
   private boolean moveToTrash(Path path) throws IOException {
-    if (trash == null)
+    if (!useTrash)
       return false;
     try {
-      return trash.moveToTrash(path);
+      return fs.moveToTrash(path);
     } catch (FileNotFoundException ex) {
       return false;
     }
@@ -377,7 +371,7 @@ public class SimpleGarbageCollector impl
       if (tabletDirs.length == 0) {
         Path p = new Path(ServerConstants.getTablesDir() + "/" + delTableId);
         if (!moveToTrash(p)) 
-          fs.delete(p, false);
+          fs.delete(p);
       }
     }
   }
@@ -506,11 +500,13 @@ public class SimpleGarbageCollector impl
     
     Scanner scanner;
     if (offline) {
-      try {
-        scanner = new OfflineMetadataScanner(instance.getConfiguration(), fs);
-      } catch (IOException e) {
-        throw new IllegalStateException("Unable to create offline metadata scanner", e);
-      }
+      // TODO
+      throw new RuntimeException("Offline scanner no longer supported");
+//      try {
+//        scanner = new OfflineMetadataScanner(instance.getConfiguration(), fs);
+//      } catch (IOException e) {
+//        throw new IllegalStateException("Unable to create offline metadata scanner", e);
+//      }
     } else {
       try {
         scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials)).createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
@@ -668,7 +664,7 @@ public class SimpleGarbageCollector impl
             
             Path p = new Path(fullPath);
             
-            if (moveToTrash(p) || fs.delete(p, true)) {
+            if (moveToTrash(p) || fs.deleteRecursively(p)) {
               // delete succeeded, still want to delete
               removeFlag = true;
               synchronized (SimpleGarbageCollector.this) {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Fri May 24 19:59:11 2013
@@ -30,15 +30,11 @@ import java.util.regex.Pattern;
 import org.apache.accumulo.core.cli.Help;
 import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
@@ -70,9 +66,7 @@ public class LogReader {
   public static void main(String[] args) throws IOException {
     Opts opts = new Opts();
     opts.parseArgs(LogReader.class.getName(), args);
-    Configuration conf = CachedConfiguration.getInstance();
-    FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
-    FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+    FileSystem fs = FileSystemImpl.get();
     
     Matcher rowMatcher = null;
     KeyExtent ke = null;
@@ -117,25 +111,9 @@ public class LogReader {
         } finally {
           f.close();
         }
-      } else if (local.isFile(path)) {
-        // read log entries from a simple file
-        FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
-        try {
-          while (true) {
-            try {
-              key.readFields(f);
-              value.readFields(f);
-            } catch (EOFException ex) {
-              break;
-            }
-            printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-          }
-        } finally {
-          f.close();
-        }
       } else {
         // read the log entries sorted in a map file
-        MultiReader input = new MultiReader(fs, conf, file);
+        MultiReader input = new MultiReader(fs, file);
         while (input.next(key, value)) {
           printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
         }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java Fri May 24 19:59:11 2013
@@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.Par
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.thrift.TKeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -83,7 +82,6 @@ import org.apache.accumulo.core.security
 import org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -98,6 +96,8 @@ import org.apache.accumulo.fate.zookeepe
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
 import org.apache.accumulo.server.master.balancer.TabletBalancer;
@@ -141,7 +141,6 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.SecurityOperation;
 import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
 import org.apache.accumulo.server.util.DefaultMap;
 import org.apache.accumulo.server.util.Halt;
@@ -156,7 +155,6 @@ import org.apache.accumulo.server.zookee
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -452,7 +450,7 @@ public class Master implements LiveTServ
   public Master(ServerConfiguration config, FileSystem fs, String hostname) throws IOException {
     this.serverConfig = config;
     this.instance = config.getInstance();
-    this.fs = TraceFileSystem.wrap(fs);
+    this.fs = fs;
     this.hostname = hostname;
     
     AccumuloConfiguration aconf = serverConfig.getConfiguration();
@@ -2229,7 +2227,7 @@ public class Master implements LiveTServ
     try {
       SecurityUtil.serverLogin();
       
-      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+      FileSystem fs = FileSystemImpl.get();
       String hostname = Accumulo.getLocalAddress(args);
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java Fri May 24 19:59:11 2013
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -33,9 +33,9 @@ public class HadoopLogCloser implements 
 
   @Override
   public long close(Master master, FileSystem fs, Path source) throws IOException {
-    
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(source);
+    if (ns instanceof DistributedFileSystem) {
+      DistributedFileSystem dfs = (DistributedFileSystem) ns;
       try {
         if (!dfs.recoverLease(source)) {
           log.info("Waiting for file to be closed " + source.toString());
@@ -48,12 +48,12 @@ public class HadoopLogCloser implements 
       } catch (Exception ex) {
         log.warn("Error recovery lease on " + source.toString(), ex);
       }
-    } else if (fs instanceof LocalFileSystem) {
+    } else if (ns instanceof LocalFileSystem) {
       // ignore
     } else {
       throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
     }
-    fs.append(source).close();
+    ns.append(source).close();
     log.info("Recovered lease on " + source.toString() + " using append");
     return 0;
   }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java Fri May 24 19:59:11 2013
@@ -19,7 +19,7 @@ package org.apache.accumulo.server.maste
 import java.io.IOException;
 
 import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public interface LogCloser {

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java Fri May 24 19:59:11 2013
@@ -19,7 +19,7 @@ package org.apache.accumulo.server.maste
 import java.io.IOException;
 
 import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.log4j.Logger;
@@ -31,9 +31,10 @@ public class MapRLogCloser implements Lo
   @Override
   public long close(Master m, FileSystem fs, Path path) throws IOException {
     log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+    org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
     FsPermission roPerm = new FsPermission((short) 0444);
     try {
-      fs.setPermission(path, roPerm);
+      ns.setPermission(path, roPerm);
       return 0;
     } catch (IOException ex) {
       log.error("error recovering lease ", ex);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Fri May 24 19:59:11 2013
@@ -35,10 +35,9 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -82,8 +81,6 @@ public class RecoveryManager {
       boolean rescheduled = false;
       try {
         FileSystem localFs = master.getFileSystem();
-        if (localFs instanceof TraceFileSystem)
-          localFs = ((TraceFileSystem) localFs).getImplementation();
       
         long time = closer.close(master, localFs, getSource(host, filename));
       

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java Fri May 24 19:59:11 2013
@@ -17,17 +17,15 @@
 package org.apache.accumulo.server.master.state;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.master.thrift.MasterGoalState;
 import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileSystem;
 
 public class SetGoalState {
   
@@ -41,7 +39,7 @@ public class SetGoalState {
     }
     SecurityUtil.serverLogin();
 
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+    FileSystem fs = FileSystemImpl.get();
     Accumulo.waitForZookeeperAndHdfs(fs);
     ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
         NodeExistsPolicy.OVERWRITE);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri May 24 19:59:11 2013
@@ -74,7 +74,7 @@ import org.apache.accumulo.server.zookee
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
@@ -440,7 +440,7 @@ class CopyFailed extends MasterRepo {
       bifCopyQueue.waitUntilDone(workIds);
     }
 
-    fs.delete(new Path(error, BulkImport.FAILURES_TXT), true);
+    fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
     return new CleanUpBulkImport(tableId, source, bulk, error);
   }
   
@@ -500,12 +500,12 @@ class LoadFiles extends MasterRepo {
     Path writable = new Path(this.errorDir, ".iswritable");
     if (!fs.createNewFile(writable)) {
       // Maybe this is a re-try... clear the flag and try again
-      fs.delete(writable, false);
+      fs.delete(writable);
       if (!fs.createNewFile(writable))
         throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
             "Unable to write to " + this.errorDir);
     }
-    fs.delete(writable, false);
+    fs.delete(writable);
     
     final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
     for (FileStatus f : files)

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Fri May 24 19:59:11 2013
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -47,12 +46,11 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.master.Master;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
@@ -169,7 +167,7 @@ class WriteExportFiles extends MasterRep
       exportConfig(conn, tableID, zipOut, dataOut);
       dataOut.flush();
       
-      Map<String,String> uniqueFiles = exportMetadata(conn, tableID, zipOut, dataOut);
+      Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
       
       dataOut.close();
       dataOut = null;
@@ -186,20 +184,12 @@ class WriteExportFiles extends MasterRep
     BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false)));
     
     try {
-      URI uri = fs.getUri();
-      
-      for (String relPath : uniqueFiles.values()) {
-        Path absPath = new Path(uri.getScheme(), uri.getAuthority(), ServerConstants.getTablesDir() + relPath);
-        distcpOut.append(absPath.toUri().toString());
+      for (String file : uniqueFiles.values()) {
+        distcpOut.append(file);
         distcpOut.newLine();
       }
       
-      Path absEMP = exportMetaFilePath;
-      if (!exportMetaFilePath.isAbsolute())
-        absEMP = new Path(fs.getWorkingDirectory().toUri().getPath(), exportMetaFilePath);
-      
-      distcpOut.append(new Path(uri.getScheme(), uri.getAuthority(), absEMP.toString()).toUri().toString());
-      
+      distcpOut.append(exportMetaFilePath.toString());
       distcpOut.newLine();
       
       distcpOut.close();
@@ -211,7 +201,7 @@ class WriteExportFiles extends MasterRep
     }
   }
   
-  private static Map<String,String> exportMetadata(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
+  private static Map<String,String> exportMetadata(FileSystem fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
       TableNotFoundException {
     zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
     
@@ -228,24 +218,18 @@ class WriteExportFiles extends MasterRep
       entry.getValue().write(dataOut);
       
       if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
-        String relPath = entry.getKey().getColumnQualifierData().toString();
-        
-        if (relPath.startsWith("../"))
-          relPath = relPath.substring(2);
-        else
-          relPath = "/" + tableID + relPath;
-        
-        String tokens[] = relPath.split("/");
-        if (tokens.length != 4) {
-          throw new RuntimeException("Illegal path " + relPath);
+        String path = fs.getFullPath(entry.getKey());
+        String tokens[] = path.split("/");
+        if (tokens.length < 1) {
+          throw new RuntimeException("Illegal path " + path);
         }
         
-        String filename = tokens[3];
+        String filename = tokens[tokens.length - 1];
         
         String existingPath = uniqueFiles.get(filename);
         if (existingPath == null) {
-          uniqueFiles.put(filename, relPath);
-        } else if (!existingPath.equals(relPath)) {
+          uniqueFiles.put(filename, path);
+        } else if (!existingPath.equals(path)) {
           // make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
           throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
         }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Fri May 24 19:59:11 2013
@@ -61,7 +61,7 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.TablePropUtil;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
@@ -98,7 +98,7 @@ class FinishImportTable extends MasterRe
   @Override
   public Repo<Master> call(long tid, Master env) throws Exception {
     
-    env.getFileSystem().delete(new Path(tableInfo.importDir, "mappings.txt"), true);
+    env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
     
     TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
     
@@ -365,7 +365,7 @@ class MapImportFileNames extends MasterR
   
   @Override
   public void undo(long tid, Master env) throws Exception {
-    env.getFileSystem().delete(new Path(tableInfo.importDir), true);
+    env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
   }
 }
 
@@ -414,7 +414,8 @@ class ImportPopulateZookeeper extends Ma
     Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
     
     try {
-      return TableOperationsImpl.getExportedProps(fs, path);
+      org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+      return TableOperationsImpl.getExportedProps(ns, path);
     } catch (IOException ioe) {
       throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
           "Error reading table props from " + path + " " + ioe.getMessage());

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Fri May 24 19:59:11 2013
@@ -27,13 +27,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.impl.MasterClient;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.gc.thrift.GCMonitorService;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.master.thrift.Compacting;
@@ -42,7 +40,6 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.master.thrift.TableInfo;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
 import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.Pair;
@@ -54,6 +51,8 @@ import org.apache.accumulo.core.zookeepe
 import org.apache.accumulo.server.Accumulo;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.monitor.servlets.DefaultServlet;
 import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
 import org.apache.accumulo.server.monitor.servlets.JSONServlet;
@@ -73,7 +72,7 @@ import org.apache.accumulo.server.proble
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.util.EmbeddedWebServer;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -450,7 +449,7 @@ public class Monitor {
   public static void main(String[] args) throws Exception {
     SecurityUtil.serverLogin();
     
-    FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+    FileSystem fs = FileSystemImpl.get();
     String hostname = Accumulo.getLocalAddress(args);
     instance = HdfsZooInstance.getInstance();
     config = new ServerConfiguration(instance);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Fri May 24 19:59:11 2013
@@ -43,12 +43,12 @@ import org.apache.accumulo.core.iterator
 import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
 import org.apache.accumulo.server.problems.ProblemReports;
 import org.apache.accumulo.server.problems.ProblemType;
 import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
@@ -305,7 +305,8 @@ public class FileManager {
     for (String file : filesToOpen) {
       try {
         // log.debug("Opening "+file);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, fs.getConf(), conf.getTableConfiguration(table.toString()),
+        org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(file);
+        FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
             dataCache, indexCache);
         reservedFiles.add(reader);
         readersReserved.put(reader, file);

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Fri May 24 19:59:11 2013
@@ -1422,7 +1422,7 @@ public class Tablet {
         for (String relPath : datafiles.keySet())
           absPaths.add(rel2abs(relPath, extent));
         
-        tabletServer.recover(this, logEntries, absPaths, new MutationReceiver() {
+        tabletServer.recover(this.tabletServer.getFileSystem(), this, logEntries, absPaths, new MutationReceiver() {
           public void receive(Mutation m) {
             // LogReader.printMutation(m);
             Collection<ColumnUpdate> muts = m.getUpdates();

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri May 24 19:59:11 2013
@@ -18,7 +18,6 @@ package org.apache.accumulo.server.table
 
 import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
 
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -46,7 +45,6 @@ import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
@@ -97,7 +95,6 @@ import org.apache.accumulo.core.data.thr
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.data.thrift.TRange;
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
-import org.apache.accumulo.core.file.FileUtil;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.master.thrift.Compacting;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -120,7 +117,6 @@ import org.apache.accumulo.core.tabletse
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
@@ -143,8 +139,8 @@ import org.apache.accumulo.server.client
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.accumulo.server.logger.LogFileKey;
-import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
 import org.apache.accumulo.server.master.state.TServerInstance;
@@ -182,7 +178,6 @@ import org.apache.accumulo.server.tablet
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
 import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.FileSystemMonitor;
 import org.apache.accumulo.server.util.Halt;
 import org.apache.accumulo.server.util.MapCounter;
@@ -205,14 +200,7 @@ import org.apache.accumulo.trace.instrum
 import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
 import org.apache.accumulo.trace.thrift.TInfo;
 import org.apache.commons.collections.map.LRUMap;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -246,7 +234,7 @@ public class TabletServer extends Abstra
     super();
     this.serverConfig = conf;
     this.instance = conf.getInstance();
-    this.fs = TraceFileSystem.wrap(fs);
+    this.fs = fs;
     this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
     SimpleTimer.getInstance().schedule(new Runnable() {
       @Override
@@ -2091,13 +2079,12 @@ public class TabletServer extends Abstra
               log.error("rename is unsuccessful");
           } else {
             log.info("Deleting walog " + filename);
-            Trash trash = new Trash(fs, fs.getConf());
             Path sourcePath = new Path(source);
-            if (!trash.moveToTrash(sourcePath) && !fs.delete(sourcePath, true))
+            if (!fs.moveToTrash(sourcePath) && !fs.deleteRecursively(sourcePath))
               log.warn("Failed to delete walog " + source);
             Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
             try {
-              if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true))
+              if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
                 log.info("Deleted any recovery log " + filename);
             } catch (FileNotFoundException ex) {
               // ignore
@@ -3220,13 +3207,11 @@ public class TabletServer extends Abstra
   public static void main(String[] args) throws IOException {
     try {
       SecurityUtil.serverLogin();
-      FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+      FileSystem fs = FileSystemImpl.get();
       String hostname = Accumulo.getLocalAddress(args);
       Instance instance = HdfsZooInstance.getInstance();
       ServerConfiguration conf = new ServerConfiguration(instance);
       Accumulo.init(fs, conf, "tserver");
-      ensureHdfsSyncIsEnabled(fs);
-      recoverLocalWriteAheadLogs(fs, conf);
       TabletServer server = new TabletServer(conf, fs);
       server.config(hostname);
       Accumulo.enableTracing(hostname, "tserver");
@@ -3236,91 +3221,6 @@ public class TabletServer extends Abstra
     }
   }
   
-  private static void ensureHdfsSyncIsEnabled(FileSystem fs) {
-    if (fs instanceof DistributedFileSystem) {
-      if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
-        String msg = "Must set dfs.durable.sync OR dfs.support.append to true.  Which one needs to be set depends on your version of HDFS.  See ACCUMULO-623. \n"
-            + "HADOOP RELEASE          VERSION           SYNC NAME             DEFAULT\n"
-            + "Apache Hadoop           0.20.205          dfs.support.append    false\n"
-            + "Apache Hadoop            0.23.x           dfs.support.append    true\n"
-            + "Apache Hadoop             1.0.x           dfs.support.append    false\n"
-            + "Apache Hadoop             1.1.x           dfs.durable.sync      true\n"
-            + "Apache Hadoop          2.0.0-2.0.2        dfs.support.append    true\n"
-            + "Cloudera CDH             3u0-3u3             ????               true\n"
-            + "Cloudera CDH               3u4            dfs.support.append    true\n"
-            + "Hortonworks HDP           `1.0            dfs.support.append    false\n"
-            + "Hortonworks HDP           `1.1            dfs.support.append    false";
-        log.fatal(msg);
-        System.exit(-1);
-      }
-      try {
-        // if this class exists
-        Class.forName("org.apache.hadoop.fs.CreateFlag");
-        // we're running hadoop 2.0, 1.1
-        if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
-          log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
-        }
-      } catch (ClassNotFoundException ex) {
-        // hadoop 1.0
-      }
-    }
-    
-  }
-  
-  /**
-   * Copy local walogs into HDFS on an upgrade
-   * 
-   */
-  public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
-    FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
-    AccumuloConfiguration conf = serverConf.getConfiguration();
-    String localWalDirectories = conf.get(Property.LOGGER_DIR);
-    for (String localWalDirectory : localWalDirectories.split(",")) {
-      if (!localWalDirectory.startsWith("/")) {
-        localWalDirectory = System.getenv("ACCUMULO_HOME") + "/" + localWalDirectory;
-      }
-      
-      FileStatus status = null;
-      try {
-        status = localfs.getFileStatus(new Path(localWalDirectory));
-      } catch (FileNotFoundException fne) {}
-      
-      if (status == null || !status.isDir()) {
-        log.debug("Local walog dir " + localWalDirectory + " not found ");
-        continue;
-      }
-      
-      for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
-        String name = file.getPath().getName();
-        try {
-          UUID.fromString(name);
-        } catch (IllegalArgumentException ex) {
-          log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
-          continue;
-        }
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-        log.info("Openning local log " + file.getPath());
-        Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
-        Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
-        FSDataOutputStream writer = fs.create(tmp);
-        while (reader.next(key, value)) {
-          try {
-            key.write(writer);
-            value.write(writer);
-          } catch (EOFException ex) {
-            break;
-          }
-        }
-        writer.close();
-        reader.close();
-        fs.rename(tmp, new Path(tmp.getParent(), name));
-        log.info("Copied local log " + name);
-        localfs.delete(new Path(localWalDirectory, name), true);
-      }
-    }
-  }
-  
   public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
     totalMinorCompactions++;
     logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3330,7 +3230,7 @@ public class TabletServer extends Abstra
     logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation);
   }
   
-  public void recover(Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
+  public void recover(FileSystem fs, Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
     List<String> recoveryLogs = new ArrayList<String>();
     List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
     Collections.sort(sorted, new Comparator<LogEntry>() {
@@ -3355,7 +3255,7 @@ public class TabletServer extends Abstra
         throw new IOException("Unable to find recovery files for extent " + tablet.getExtent() + " logEntry: " + entry);
       recoveryLogs.add(recovery);
     }
-    logger.recover(tablet, recoveryLogs, tabletFiles, mutationReceiver);
+    logger.recover(fs, tablet, recoveryLogs, tabletFiles, mutationReceiver);
   }
   
   private final AtomicInteger logIdGenerator = new AtomicInteger();
@@ -3561,5 +3461,9 @@ public class TabletServer extends Abstra
       }
     };
   }
+
+  public FileSystem getFileSystem() {
+    return fs;
+  }
   
 }

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Fri May 24 19:59:11 2013
@@ -48,12 +48,12 @@ import org.apache.accumulo.core.util.Met
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
 import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Logger;
 
 /**

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Fri May 24 19:59:11 2013
@@ -29,7 +29,6 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,19 +43,14 @@ import org.apache.accumulo.core.data.Key
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tabletserver.TabletMutations;
-import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.fs.CreateFlag;
-//import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
 import org.apache.log4j.Logger;
 
 /**
@@ -258,17 +252,14 @@ public class DfsLogger {
       FileSystem fs = conf.getFileSystem();
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
-        replication = fs.getDefaultReplication();
+        replication = fs.getDefaultReplication(logPath);
       long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
       if (blockSize == 0)
         blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
-      int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
-      blockSize -= blockSize % checkSum;
-      blockSize = Math.max(blockSize, checkSum);
       if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
-        logFile = create(fs, logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+        logFile = fs.createSyncable(logPath, 0, replication, blockSize);
       else
-        logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+        logFile = fs.create(logPath, true, 0, replication, blockSize);
       
       try {
         // sync: send data to datanodes
@@ -328,43 +319,6 @@ public class DfsLogger {
     t.start();
   }
   
-  private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
-    try {
-      // This... 
-      //    EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
-      //    return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
-      // Becomes this:
-      Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
-      List<Enum<?>> flags = new ArrayList<Enum<?>>();
-      if (createFlags.isEnum()) {
-        for (Object constant : createFlags.getEnumConstants()) {
-          if (constant.toString().equals("SYNC_BLOCK")) {
-            flags.add((Enum<?>)constant);
-            log.debug("Found synch enum " + constant);
-          }
-          if (constant.toString().equals("CREATE")) {
-            flags.add((Enum<?>)constant);
-            log.debug("Found CREATE enum " + constant);
-          }
-        }
-      }
-      Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
-      log.debug("CreateFlag set: " + set);
-      if (fs instanceof TraceFileSystem) {
-        fs = ((TraceFileSystem)fs).getImplementation();
-      }
-      Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
-      log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
-      return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
-    } catch (ClassNotFoundException ex) {
-      // Expected in hadoop 1.0
-      return fs.create(logPath, b, buffersize, replication, blockSize);
-    } catch (Exception ex) {
-      log.debug(ex, ex);
-      return fs.create(logPath, b, buffersize, replication, blockSize);
-    }
-  }
-
   /*
    * (non-Javadoc)
    * 

Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Fri May 24 19:59:11 2013
@@ -37,12 +37,12 @@ import org.apache.accumulo.core.master.t
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.fs.FileSystem;
 import org.apache.accumulo.server.logger.LogFileKey;
 import org.apache.accumulo.server.logger.LogFileValue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
 import org.apache.log4j.Logger;
@@ -104,7 +104,7 @@ public class LogSorter {
       try {
         
         // the following call does not throw an exception if the file/dir does not exist
-        fs.delete(new Path(destPath), true);
+        fs.deleteRecursively(new Path(destPath));
         
         FSDataInputStream tmpInput = fs.open(srcPath);
         DataInputStream tmpDecryptingInput = tmpInput;
@@ -193,7 +193,8 @@ public class LogSorter {
     
     private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
       String path = destPath + String.format("/part-r-%05d", part++);
-      MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
+      org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+      MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path, LogFileKey.class, LogFileValue.class);
       try {
         Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
           @Override