You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/05/24 21:59:11 UTC
svn commit: r1486202 [1/2] - in /accumulo/branches/ACCUMULO-118:
core/src/main/java/org/apache/accumulo/core/conf/
server/src/main/java/org/apache/accumulo/server/
server/src/main/java/org/apache/accumulo/server/fs/
server/src/main/java/org/apache/accu...
Author: ecn
Date: Fri May 24 19:59:11 2013
New Revision: 1486202
URL: http://svn.apache.org/r1486202
Log:
ACCUMULO-118 route the servers to new FileSystem interface; unit tests and functional tests still run
Added:
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java (with props)
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java (with props)
Modified:
accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/MultiReader.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecovery.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java
accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
accumulo/branches/ACCUMULO-118/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
accumulo/branches/ACCUMULO-118/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java
Modified: accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/branches/ACCUMULO-118/core/src/main/java/org/apache/accumulo/core/conf/Property.java Fri May 24 19:59:11 2013
@@ -70,6 +70,8 @@ public enum Property {
"A secret unique to a given instance that all servers must know in order to communicate with one another."
+ " Change it before initialization. To change it later use ./bin/accumulo accumulo.server.util.ChangeSecret [oldpasswd] [newpasswd], "
+ " and then update conf/accumulo-site.xml everywhere."),
+ INSTANCE_NAMESPACES("instance.namespaces", "", PropertyType.STRING,
+ "A list of namespaces to use."),
INSTANCE_SECURITY_AUTHENTICATOR("instance.security.authenticator", "org.apache.accumulo.server.security.handler.ZKAuthenticator", PropertyType.CLASSNAME,
"The authenticator class that accumulo will use to determine if a user has privilege to perform an action"),
INSTANCE_SECURITY_AUTHORIZOR("instance.security.authorizor", "org.apache.accumulo.server.security.handler.ZKAuthorizor", PropertyType.CLASSNAME,
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/Accumulo.java Fri May 24 19:59:11 2013
@@ -20,7 +20,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map.Entry;
@@ -33,12 +32,11 @@ import org.apache.accumulo.core.util.Uti
import org.apache.accumulo.core.util.Version;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.Logger;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.xml.DOMConfigurator;
@@ -52,7 +50,7 @@ public class Accumulo {
try {
if (getAccumuloPersistentVersion(fs) == Constants.PREV_DATA_VERSION) {
fs.create(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.DATA_VERSION));
- fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION), false);
+ fs.delete(new Path(ServerConstants.getDataVersionLocation() + "/" + Constants.PREV_DATA_VERSION));
}
} catch (IOException e) {
throw new RuntimeException("Unable to set accumulo version: an error occurred.", e);
@@ -62,7 +60,7 @@ public class Accumulo {
public static synchronized int getAccumuloPersistentVersion(FileSystem fs) {
int dataVersion;
try {
- FileStatus[] files = fs.listStatus(ServerConstants.getDataVersionLocation());
+ FileStatus[] files = fs.getDefaultNamespace().listStatus(ServerConstants.getDataVersionLocation());
if (files == null || files.length == 0) {
dataVersion = -1; // assume it is 0.5 or earlier
} else {
@@ -198,7 +196,7 @@ public class Accumulo {
long sleep = 1000;
while (true) {
try {
- if (!isInSafeMode(fs))
+ if (fs.isReady())
break;
log.warn("Waiting for the NameNode to leave safemode");
} catch (IOException ex) {
@@ -211,37 +209,4 @@ public class Accumulo {
log.info("Connected to HDFS");
}
- private static boolean isInSafeMode(FileSystem fs) throws IOException {
- if (!(fs instanceof DistributedFileSystem))
- return false;
- DistributedFileSystem dfs = (DistributedFileSystem)fs;
- // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
- // Becomes this:
- Class<?> safeModeAction;
- try {
- // hadoop 2.0
- safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
- } catch (ClassNotFoundException ex) {
- // hadoop 1.0
- try {
- safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot figure out the right class for Constants");
- }
- }
- Object get = null;
- for (Object obj : safeModeAction.getEnumConstants()) {
- if (obj.toString().equals("SAFEMODE_GET"))
- get = obj;
- }
- if (get == null) {
- throw new RuntimeException("cannot find SAFEMODE_GET");
- }
- try {
- Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
- return (Boolean) setSafeMode.invoke(dfs, get);
- } catch (Exception ex) {
- throw new RuntimeException("cannot find method setSafeMode");
- }
- }
}
Added: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java?rev=1486202&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java (added)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java Fri May 24 19:59:11 2013
@@ -0,0 +1,64 @@
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+public interface FileSystem {
+
+ void close() throws IOException;
+
+ boolean closePossiblyOpenFile(Path path) throws IOException;
+
+ FSDataOutputStream create(Path dest) throws IOException;
+
+ FSDataOutputStream create(Path path, boolean b) throws IOException;
+
+ FSDataOutputStream create(Path path, boolean b, int int1, short int2, long long1) throws IOException;
+
+ boolean createNewFile(Path writable) throws IOException;
+
+ FSDataOutputStream createSyncable(Path logPath, int buffersize, short replication, long blockSize) throws IOException;
+
+ boolean delete(Path path) throws IOException;
+
+ boolean deleteRecursively(Path path) throws IOException;
+
+ boolean exists(Path newBulkDir) throws IOException;
+
+ FileStatus getFileStatus(Path errorPath) throws IOException;
+
+ org.apache.hadoop.fs.FileSystem getFileSystemByPath(Path path);
+
+ org.apache.hadoop.fs.FileSystem getFileSystemByPath(String path);
+
+ Collection<org.apache.hadoop.fs.FileSystem> getFileSystems();
+
+ FileStatus[] listStatus(Path path) throws IOException;
+
+ boolean mkdirs(Path directory) throws IOException;
+
+ FSDataInputStream open(Path path) throws IOException;
+
+ boolean rename(Path path, Path newPath) throws IOException;
+
+ boolean moveToTrash(Path sourcePath) throws IOException;
+
+ short getDefaultReplication(Path logPath);
+
+ boolean isFile(Path path) throws IOException;
+
+ boolean isReady() throws IOException;
+
+ org.apache.hadoop.fs.FileSystem getDefaultNamespace();
+
+ FileStatus[] globStatus(Path path) throws IOException;
+
+ String getFullPath(Key key);
+
+}
Propchange: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystem.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java?rev=1486202&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java (added)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java Fri May 24 19:59:11 2013
@@ -0,0 +1,360 @@
+package org.apache.accumulo.server.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.apache.log4j.Logger;
+
+// hide the need to make blocksize be an even number of checksum blocks
+//
+// int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
+// blockSize -= blockSize % checkSum;
+// blockSize = Math.max(blockSize, checkSum);
+// if buffersize is zero, get it from the filesystem
+//
+
+public class FileSystemImpl implements org.apache.accumulo.server.fs.FileSystem {
+
+ private static final Logger log = Logger.getLogger(FileSystem.class);
+
+ Map<String, ? extends FileSystem> namespaces;
+ String defaultNamespace;
+ AccumuloConfiguration conf;
+
+ protected FileSystemImpl(Map<String, ? extends FileSystem> namespaces, String defaultNamespace, AccumuloConfiguration conf) {
+ this.namespaces = namespaces;
+ this.defaultNamespace = defaultNamespace;
+ this.conf = conf;
+ ensureSyncIsEnabled();
+ }
+
+ public static org.apache.accumulo.server.fs.FileSystem getLocal() throws IOException {
+ return new FileSystemImpl(Collections.singletonMap("", FileSystem.getLocal(CachedConfiguration.getInstance())), "", DefaultConfiguration.getDefaultConfiguration());
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ex = null;
+ for (FileSystem fs : namespaces.values()) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ ex = e;
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ @Override
+ public boolean closePossiblyOpenFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ try {
+ return dfs.recoverLease(path);
+ } catch (FileNotFoundException ex) {
+ throw ex;
+ }
+ } else if (fs instanceof LocalFileSystem) {
+ // ignore
+ } else {
+ throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
+ }
+ fs.append(path).close();
+ log.info("Recovered lease on " + path.toString() + " using append");
+ return true;
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.create(path, overwrite);
+ }
+
+ private static long correctBlockSize(Configuration conf, long blockSize) {
+ if (blockSize <= 0)
+ blockSize = conf.getLong("dfs.block.size", 67108864);
+
+ int checkSum = conf.getInt("io.bytes.per.checksum", 512);
+ blockSize -= blockSize % checkSum;
+ blockSize = Math.max(blockSize, checkSum);
+ return blockSize;
+ }
+
+ private static int correctBufferSize(Configuration conf, int bufferSize) {
+ if (bufferSize <= 0)
+ bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ return bufferSize;
+ }
+
+
+ @Override
+ public FSDataOutputStream create(Path path, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ if (bufferSize == 0) {
+ fs.getConf().getInt("io.file.buffer.size", 4096);
+ }
+ return fs.create(path, overwrite, bufferSize, replication, correctBlockSize(fs.getConf(), blockSize));
+ }
+
+ @Override
+ public boolean createNewFile(Path path) throws IOException {
+ FileSystem fs = getFileSystemByPath(path);
+ return fs.createNewFile(path);
+ }
+ @Override
+ public FSDataOutputStream createSyncable(Path logPath, int bufferSize, short replication, long blockSize) throws IOException {
+ FileSystem fs = getFileSystemByPath(logPath);
+ blockSize = correctBlockSize(fs.getConf(), blockSize);
+ bufferSize = correctBufferSize(fs.getConf(), bufferSize);
+ try {
+ // This...
+ // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+ // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
+ // Becomes this:
+ Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
+ List<Enum<?>> flags = new ArrayList<Enum<?>>();
+ if (createFlags.isEnum()) {
+ for (Object constant : createFlags.getEnumConstants()) {
+ if (constant.toString().equals("SYNC_BLOCK")) {
+ flags.add((Enum<?>)constant);
+ log.debug("Found synch enum " + constant);
+ }
+ if (constant.toString().equals("CREATE")) {
+ flags.add((Enum<?>)constant);
+ log.debug("Found CREATE enum " + constant);
+ }
+ }
+ }
+ Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
+ log.debug("CreateFlag set: " + set);
+ Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
+ log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
+ return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, bufferSize, replication, blockSize, null);
+ } catch (ClassNotFoundException ex) {
+ // Expected in hadoop 1.0
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ } catch (Exception ex) {
+ log.debug(ex, ex);
+ return fs.create(logPath, true, bufferSize, replication, blockSize);
+ }
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException{
+ return getFileSystemByPath(path).delete(path, false);
+ }
+
+ @Override
+ public boolean deleteRecursively(Path path) throws IOException {
+ return getFileSystemByPath(path).delete(path, true);
+ }
+
+ private void ensureSyncIsEnabled() {
+ for (FileSystem fs : getFileSystems()) {
+ if (fs instanceof DistributedFileSystem) {
+ if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
+ String msg = "Must set dfs.durable.sync OR dfs.support.append to true. Which one needs to be set depends on your version of HDFS. See ACCUMULO-623. \n"
+ + "HADOOP RELEASE VERSION SYNC NAME DEFAULT\n"
+ + "Apache Hadoop 0.20.205 dfs.support.append false\n"
+ + "Apache Hadoop 0.23.x dfs.support.append true\n"
+ + "Apache Hadoop 1.0.x dfs.support.append false\n"
+ + "Apache Hadoop 1.1.x dfs.durable.sync true\n"
+ + "Apache Hadoop 2.0.0-2.0.2 dfs.support.append true\n"
+ + "Cloudera CDH 3u0-3u3 ???? true\n"
+ + "Cloudera CDH 3u4 dfs.support.append true\n"
+ + "Hortonworks HDP `1.0 dfs.support.append false\n"
+ + "Hortonworks HDP `1.1 dfs.support.append false";
+ log.fatal(msg);
+ System.exit(-1);
+ }
+ try {
+ // if this class exists
+ Class.forName("org.apache.hadoop.fs.CreateFlag");
+ // we're running hadoop 2.0, 1.1
+ if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
+ log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
+ }
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return getFileSystemByPath(path).exists(path);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).getFileStatus(path);
+ }
+
+ @Override
+ public FileSystem getFileSystemByPath(Path path) {
+ log.info("Looking up namespace on " + path);
+ return namespaces.get(defaultNamespace);
+ }
+
+ @Override
+ public FileSystem getFileSystemByPath(String path) {
+ return getFileSystemByPath(new Path(path));
+ }
+
+ @Override
+ public Collection<FileSystem> getFileSystems() {
+ return new ArrayList<FileSystem>(namespaces.values());
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return getFileSystemByPath(path).listStatus(path);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ return getFileSystemByPath(path).mkdirs(path);
+ }
+
+ @Override
+ public FSDataInputStream open(Path path) throws IOException {
+ return getFileSystemByPath(path).open(path);
+ }
+
+ @Override
+ public boolean rename(Path path, Path newPath)throws IOException {
+ FileSystem source = getFileSystemByPath(path);
+ FileSystem dest = getFileSystemByPath(newPath);
+ if (source != dest) {
+ throw new NotImplementedException("Cannot rename files across namespaces: " + path + " -> " + newPath);
+ }
+ return source.rename(path, newPath);
+ }
+
+ @Override
+ public boolean moveToTrash(Path path) throws IOException{
+ FileSystem fs = getFileSystemByPath(path);
+ Trash trash = new Trash(fs, fs.getConf());
+ return trash.moveToTrash(path);
+ }
+
+ @Override
+ public short getDefaultReplication(Path path) {
+ return getFileSystemByPath(path).getDefaultReplication();
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ return getFileSystemByPath(path).isFile(path);
+ }
+
+ public static org.apache.accumulo.server.fs.FileSystem get() throws IOException {
+ AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(HdfsZooInstance.getInstance());
+ return new FileSystemImpl(Collections.singletonMap("", FileSystem.get(CachedConfiguration.getInstance())), "", conf);
+ }
+
+ @Override
+ public boolean isReady() throws IOException {
+ for (FileSystem fs : getFileSystems()) {
+ if (!(fs instanceof DistributedFileSystem))
+ continue;
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ // So this: if (!dfs.setSafeMode(SafeModeAction.SAFEMODE_GET))
+ // Becomes this:
+ Class<?> safeModeAction;
+ try {
+ // hadoop 2.0
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction");
+ } catch (ClassNotFoundException ex) {
+ // hadoop 1.0
+ try {
+ safeModeAction = Class.forName("org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Cannot figure out the right class for Constants");
+ }
+ }
+ Object get = null;
+ for (Object obj : safeModeAction.getEnumConstants()) {
+ if (obj.toString().equals("SAFEMODE_GET"))
+ get = obj;
+ }
+ if (get == null) {
+ throw new RuntimeException("cannot find SAFEMODE_GET");
+ }
+ try {
+ Method setSafeMode = dfs.getClass().getMethod("setSafeMode", safeModeAction);
+ boolean inSafeMode = (Boolean) setSafeMode.invoke(dfs, get);
+ if (inSafeMode) {
+ return false;
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException("cannot find method setSafeMode");
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public FileSystem getDefaultNamespace() {
+ return namespaces.get(defaultNamespace);
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ return getFileSystemByPath(pathPattern).globStatus(pathPattern);
+ }
+
+ @Override
+ public String getFullPath(Key key) {
+
+ String relPath = key.getColumnQualifierData().toString();
+ byte [] tableId = KeyExtent.tableOfMetadataRow(key.getRow());
+
+ if (relPath.startsWith("../"))
+ relPath = relPath.substring(2);
+ else
+ relPath = "/" + new String(tableId) + relPath;
+ String fullPath = Constants.getTablesDir(conf) + relPath;
+ FileSystem ns = getFileSystemByPath(fullPath);
+ return ns.makeQualified(new Path(fullPath)).toString();
+ }
+
+}
Propchange: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/fs/FileSystemImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/GarbageCollectWriteAheadLogs.java Fri May 24 19:59:11 2013
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.tabletse
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.AddressUtil;
import org.apache.accumulo.server.util.MetadataTable;
@@ -47,9 +48,7 @@ import org.apache.accumulo.trace.instrum
import org.apache.accumulo.trace.instrument.Trace;
import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
@@ -60,13 +59,11 @@ public class GarbageCollectWriteAheadLog
private final Instance instance;
private final FileSystem fs;
- private Trash trash;
+ private boolean useTrash;
- GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs, boolean noTrash) throws IOException {
+ GarbageCollectWriteAheadLogs(Instance instance, FileSystem fs, boolean useTrash) throws IOException {
this.instance = instance;
this.fs = fs;
- if (!noTrash)
- this.trash = new Trash(fs, fs.getConf());
}
public void collect(GCStatus status) {
@@ -139,8 +136,8 @@ public class GarbageCollectWriteAheadLog
log.debug("Removing old-style WAL " + entry.getValue());
try {
Path path = new Path(Constants.getWalDirectory(conf), filename);
- if (trash == null || !trash.moveToTrash(path))
- fs.delete(path, true);
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
status.currentLog.deleted++;
} catch (FileNotFoundException ex) {
// ignored
@@ -156,8 +153,8 @@ public class GarbageCollectWriteAheadLog
log.debug("Removing WAL for offline server " + filename);
try {
Path path = new Path(serverPath, filename);
- if (trash == null || !trash.moveToTrash(path))
- fs.delete(path, true);
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
status.currentLog.deleted++;
} catch (FileNotFoundException ex) {
// ignored
@@ -189,8 +186,8 @@ public class GarbageCollectWriteAheadLog
log.debug("Removing sorted WAL " + sortedWALog);
Path swalog = new Path(recoveryDir, sortedWALog);
try {
- if (trash == null || !trash.moveToTrash(swalog)) {
- fs.delete(swalog, true);
+ if (!useTrash || !fs.moveToTrash(swalog)) {
+ fs.deleteRecursively(swalog);
}
} catch (FileNotFoundException ex) {
// ignored
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Fri May 24 19:59:11 2013
@@ -56,7 +56,6 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.gc.thrift.GCMonitorService.Iface;
import org.apache.accumulo.core.gc.thrift.GCMonitorService.Processor;
import org.apache.accumulo.core.gc.thrift.GCStatus;
@@ -65,7 +64,6 @@ import org.apache.accumulo.core.master.s
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.SecurityUtil;
import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ServerServices.Service;
@@ -77,11 +75,11 @@ import org.apache.accumulo.server.Accumu
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.SecurityConstants;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.Halt;
-import org.apache.accumulo.server.util.OfflineMetadataScanner;
import org.apache.accumulo.server.util.TServerUtils;
import org.apache.accumulo.server.util.TabletIterator;
import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -92,9 +90,7 @@ import org.apache.accumulo.trace.instrum
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -126,7 +122,7 @@ public class SimpleGarbageCollector impl
private long gcStartDelay;
private boolean checkForBulkProcessingFiles;
private FileSystem fs;
- private Trash trash = null;
+ private boolean useTrash = true;
private boolean safemode = false, offline = false, verbose = false;
private String address = "localhost";
private ZooLock lock;
@@ -143,7 +139,7 @@ public class SimpleGarbageCollector impl
Instance instance = HdfsZooInstance.getInstance();
ServerConfiguration serverConf = new ServerConfiguration(instance);
- final FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), serverConf.getConfiguration());
+ final FileSystem fs = FileSystemImpl.get();
Accumulo.init(fs, serverConf, "gc");
String address = "localhost";
SimpleGarbageCollector gc = new SimpleGarbageCollector();
@@ -183,7 +179,7 @@ public class SimpleGarbageCollector impl
}
public void init(FileSystem fs, Instance instance, TCredentials credentials, boolean noTrash) throws IOException {
- this.fs = TraceFileSystem.wrap(fs);
+ this.fs = fs;
this.credentials = credentials;
this.instance = instance;
@@ -197,9 +193,7 @@ public class SimpleGarbageCollector impl
log.info("verbose: " + verbose);
log.info("memory threshold: " + CANDIDATE_MEMORY_PERCENTAGE + " of " + Runtime.getRuntime().maxMemory() + " bytes");
log.info("delete threads: " + numDeleteThreads);
- if (!noTrash) {
- this.trash = new Trash(fs, fs.getConf());
- }
+ useTrash = !noTrash;
}
private void run() {
@@ -299,7 +293,7 @@ public class SimpleGarbageCollector impl
// Clean up any unused write-ahead logs
Span waLogs = Trace.start("walogs");
try {
- GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, trash == null);
+ GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(instance, fs, useTrash);
log.info("Beginning garbage collection of write-ahead logs");
walogCollector.collect(status);
} catch (Exception e) {
@@ -329,10 +323,10 @@ public class SimpleGarbageCollector impl
}
private boolean moveToTrash(Path path) throws IOException {
- if (trash == null)
+ if (!useTrash)
return false;
try {
- return trash.moveToTrash(path);
+ return fs.moveToTrash(path);
} catch (FileNotFoundException ex) {
return false;
}
@@ -377,7 +371,7 @@ public class SimpleGarbageCollector impl
if (tabletDirs.length == 0) {
Path p = new Path(ServerConstants.getTablesDir() + "/" + delTableId);
if (!moveToTrash(p))
- fs.delete(p, false);
+ fs.delete(p);
}
}
}
@@ -506,11 +500,13 @@ public class SimpleGarbageCollector impl
Scanner scanner;
if (offline) {
- try {
- scanner = new OfflineMetadataScanner(instance.getConfiguration(), fs);
- } catch (IOException e) {
- throw new IllegalStateException("Unable to create offline metadata scanner", e);
- }
+ // TODO
+ throw new RuntimeException("Offline scanner no longer supported");
+// try {
+// scanner = new OfflineMetadataScanner(instance.getConfiguration(), fs);
+// } catch (IOException e) {
+// throw new IllegalStateException("Unable to create offline metadata scanner", e);
+// }
} else {
try {
scanner = new IsolatedScanner(instance.getConnector(credentials.getPrincipal(), CredentialHelper.extractToken(credentials)).createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
@@ -668,7 +664,7 @@ public class SimpleGarbageCollector impl
Path p = new Path(fullPath);
- if (moveToTrash(p) || fs.delete(p, true)) {
+ if (moveToTrash(p) || fs.deleteRecursively(p)) {
// delete succeeded, still want to delete
removeFlag = true;
synchronized (SimpleGarbageCollector.this) {
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/logger/LogReader.java Fri May 24 19:59:11 2013
@@ -30,15 +30,11 @@ import java.util.regex.Pattern;
import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.file.FileUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.tabletserver.log.DfsLogger;
import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.accumulo.server.trace.TraceFileSystem;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -70,9 +66,7 @@ public class LogReader {
public static void main(String[] args) throws IOException {
Opts opts = new Opts();
opts.parseArgs(LogReader.class.getName(), args);
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(conf, ServerConfiguration.getSiteConfiguration()));
- FileSystem local = TraceFileSystem.wrap(FileSystem.getLocal(conf));
+ FileSystem fs = FileSystemImpl.get();
Matcher rowMatcher = null;
KeyExtent ke = null;
@@ -117,25 +111,9 @@ public class LogReader {
} finally {
f.close();
}
- } else if (local.isFile(path)) {
- // read log entries from a simple file
- FSDataInputStream f = DfsLogger.readHeader(fs, path, meta);
- try {
- while (true) {
- try {
- key.readFields(f);
- value.readFields(f);
- } catch (EOFException ex) {
- break;
- }
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
- }
- } finally {
- f.close();
- }
} else {
// read the log entries sorted in a map file
- MultiReader input = new MultiReader(fs, conf, file);
+ MultiReader input = new MultiReader(fs, file);
while (input.next(key, value)) {
printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
}
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/Master.java Fri May 24 19:59:11 2013
@@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.Par
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -83,7 +82,6 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.zookeeper.ZooUtil;
@@ -98,6 +96,8 @@ import org.apache.accumulo.fate.zookeepe
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
import org.apache.accumulo.server.master.balancer.TabletBalancer;
@@ -141,7 +141,6 @@ import org.apache.accumulo.server.securi
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.AddressUtil;
import org.apache.accumulo.server.util.DefaultMap;
import org.apache.accumulo.server.util.Halt;
@@ -156,7 +155,6 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -452,7 +450,7 @@ public class Master implements LiveTServ
public Master(ServerConfiguration config, FileSystem fs, String hostname) throws IOException {
this.serverConfig = config;
this.instance = config.getInstance();
- this.fs = TraceFileSystem.wrap(fs);
+ this.fs = fs;
this.hostname = hostname;
AccumuloConfiguration aconf = serverConfig.getConfiguration();
@@ -2229,7 +2227,7 @@ public class Master implements LiveTServ
try {
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ FileSystem fs = FileSystemImpl.get();
String hostname = Accumulo.getLocalAddress(args);
Instance instance = HdfsZooInstance.getInstance();
ServerConfiguration conf = new ServerConfiguration(instance);
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java Fri May 24 19:59:11 2013
@@ -21,7 +21,7 @@ import java.io.IOException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -33,9 +33,9 @@ public class HadoopLogCloser implements
@Override
public long close(Master master, FileSystem fs, Path source) throws IOException {
-
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(source);
+ if (ns instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) ns;
try {
if (!dfs.recoverLease(source)) {
log.info("Waiting for file to be closed " + source.toString());
@@ -48,12 +48,12 @@ public class HadoopLogCloser implements
} catch (Exception ex) {
log.warn("Error recovery lease on " + source.toString(), ex);
}
- } else if (fs instanceof LocalFileSystem) {
+ } else if (ns instanceof LocalFileSystem) {
// ignore
} else {
throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
}
- fs.append(source).close();
+ ns.append(source).close();
log.info("Recovered lease on " + source.toString() + " using append");
return 0;
}
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java Fri May 24 19:59:11 2013
@@ -19,7 +19,7 @@ package org.apache.accumulo.server.maste
import java.io.IOException;
import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public interface LogCloser {
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java Fri May 24 19:59:11 2013
@@ -19,7 +19,7 @@ package org.apache.accumulo.server.maste
import java.io.IOException;
import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.log4j.Logger;
@@ -31,9 +31,10 @@ public class MapRLogCloser implements Lo
@Override
public long close(Master m, FileSystem fs, Path path) throws IOException {
log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
FsPermission roPerm = new FsPermission((short) 0444);
try {
- fs.setPermission(path, roPerm);
+ ns.setPermission(path, roPerm);
return 0;
} catch (IOException ex) {
log.error("error recovering lease ", ex);
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Fri May 24 19:59:11 2013
@@ -35,10 +35,9 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -82,8 +81,6 @@ public class RecoveryManager {
boolean rescheduled = false;
try {
FileSystem localFs = master.getFileSystem();
- if (localFs instanceof TraceFileSystem)
- localFs = ((TraceFileSystem) localFs).getImplementation();
long time = closer.close(master, localFs, getSource(host, filename));
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java Fri May 24 19:59:11 2013
@@ -17,17 +17,15 @@
package org.apache.accumulo.server.master.state;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.thrift.MasterGoalState;
import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileSystem;
public class SetGoalState {
@@ -41,7 +39,7 @@ public class SetGoalState {
}
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ FileSystem fs = FileSystemImpl.get();
Accumulo.waitForZookeeperAndHdfs(fs);
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
NodeExistsPolicy.OVERWRITE);
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri May 24 19:59:11 2013
@@ -74,7 +74,7 @@ import org.apache.accumulo.server.zookee
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
@@ -440,7 +440,7 @@ class CopyFailed extends MasterRepo {
bifCopyQueue.waitUntilDone(workIds);
}
- fs.delete(new Path(error, BulkImport.FAILURES_TXT), true);
+ fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
return new CleanUpBulkImport(tableId, source, bulk, error);
}
@@ -500,12 +500,12 @@ class LoadFiles extends MasterRepo {
Path writable = new Path(this.errorDir, ".iswritable");
if (!fs.createNewFile(writable)) {
// Maybe this is a re-try... clear the flag and try again
- fs.delete(writable, false);
+ fs.delete(writable);
if (!fs.createNewFile(writable))
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
"Unable to write to " + this.errorDir);
}
- fs.delete(writable, false);
+ fs.delete(writable);
final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
for (FileStatus f : files)
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Fri May 24 19:59:11 2013
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -47,12 +46,11 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.master.Master;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -169,7 +167,7 @@ class WriteExportFiles extends MasterRep
exportConfig(conn, tableID, zipOut, dataOut);
dataOut.flush();
- Map<String,String> uniqueFiles = exportMetadata(conn, tableID, zipOut, dataOut);
+ Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
dataOut.close();
dataOut = null;
@@ -186,20 +184,12 @@ class WriteExportFiles extends MasterRep
BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false)));
try {
- URI uri = fs.getUri();
-
- for (String relPath : uniqueFiles.values()) {
- Path absPath = new Path(uri.getScheme(), uri.getAuthority(), ServerConstants.getTablesDir() + relPath);
- distcpOut.append(absPath.toUri().toString());
+ for (String file : uniqueFiles.values()) {
+ distcpOut.append(file);
distcpOut.newLine();
}
- Path absEMP = exportMetaFilePath;
- if (!exportMetaFilePath.isAbsolute())
- absEMP = new Path(fs.getWorkingDirectory().toUri().getPath(), exportMetaFilePath);
-
- distcpOut.append(new Path(uri.getScheme(), uri.getAuthority(), absEMP.toString()).toUri().toString());
-
+ distcpOut.append(exportMetaFilePath.toString());
distcpOut.newLine();
distcpOut.close();
@@ -211,7 +201,7 @@ class WriteExportFiles extends MasterRep
}
}
- private static Map<String,String> exportMetadata(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
+ private static Map<String,String> exportMetadata(FileSystem fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
TableNotFoundException {
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
@@ -228,24 +218,18 @@ class WriteExportFiles extends MasterRep
entry.getValue().write(dataOut);
if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
- String relPath = entry.getKey().getColumnQualifierData().toString();
-
- if (relPath.startsWith("../"))
- relPath = relPath.substring(2);
- else
- relPath = "/" + tableID + relPath;
-
- String tokens[] = relPath.split("/");
- if (tokens.length != 4) {
- throw new RuntimeException("Illegal path " + relPath);
+ String path = fs.getFullPath(entry.getKey());
+ String tokens[] = path.split("/");
+ if (tokens.length < 1) {
+ throw new RuntimeException("Illegal path " + path);
}
- String filename = tokens[3];
+ String filename = tokens[tokens.length - 1];
String existingPath = uniqueFiles.get(filename);
if (existingPath == null) {
- uniqueFiles.put(filename, relPath);
- } else if (!existingPath.equals(relPath)) {
+ uniqueFiles.put(filename, path);
+ } else if (!existingPath.equals(path)) {
// make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
}
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Fri May 24 19:59:11 2013
@@ -61,7 +61,7 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -98,7 +98,7 @@ class FinishImportTable extends MasterRe
@Override
public Repo<Master> call(long tid, Master env) throws Exception {
- env.getFileSystem().delete(new Path(tableInfo.importDir, "mappings.txt"), true);
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
@@ -365,7 +365,7 @@ class MapImportFileNames extends MasterR
@Override
public void undo(long tid, Master env) throws Exception {
- env.getFileSystem().delete(new Path(tableInfo.importDir), true);
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
}
}
@@ -414,7 +414,8 @@ class ImportPopulateZookeeper extends Ma
Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
try {
- return TableOperationsImpl.getExportedProps(fs, path);
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+ return TableOperationsImpl.getExportedProps(ns, path);
} catch (IOException ioe) {
throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
"Error reading table props from " + path + " " + ioe.getMessage());
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Fri May 24 19:59:11 2013
@@ -27,13 +27,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.master.thrift.Compacting;
@@ -42,7 +40,6 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.Pair;
@@ -54,6 +51,8 @@ import org.apache.accumulo.core.zookeepe
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.monitor.servlets.DefaultServlet;
import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
import org.apache.accumulo.server.monitor.servlets.JSONServlet;
@@ -73,7 +72,7 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.EmbeddedWebServer;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -450,7 +449,7 @@ public class Monitor {
public static void main(String[] args) throws Exception {
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ FileSystem fs = FileSystemImpl.get();
String hostname = Accumulo.getLocalAddress(args);
instance = HdfsZooInstance.getInstance();
config = new ServerConfiguration(instance);
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Fri May 24 19:59:11 2013
@@ -43,12 +43,12 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.util.time.SimpleTimer;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -305,7 +305,8 @@ public class FileManager {
for (String file : filesToOpen) {
try {
// log.debug("Opening "+file);
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, fs.getConf(), conf.getTableConfiguration(table.toString()),
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(file);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
dataCache, indexCache);
reservedFiles.add(reader);
readersReserved.put(reader, file);
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java Fri May 24 19:59:11 2013
@@ -1422,7 +1422,7 @@ public class Tablet {
for (String relPath : datafiles.keySet())
absPaths.add(rel2abs(relPath, extent));
- tabletServer.recover(this, logEntries, absPaths, new MutationReceiver() {
+ tabletServer.recover(this.tabletServer.getFileSystem(), this, logEntries, absPaths, new MutationReceiver() {
public void receive(Mutation m) {
// LogReader.printMutation(m);
Collection<ColumnUpdate> muts = m.getUpdates();
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri May 24 19:59:11 2013
@@ -18,7 +18,6 @@ package org.apache.accumulo.server.table
import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD;
-import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -46,7 +45,6 @@ import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
@@ -97,7 +95,6 @@ import org.apache.accumulo.core.data.thr
import org.apache.accumulo.core.data.thrift.TMutation;
import org.apache.accumulo.core.data.thrift.TRange;
import org.apache.accumulo.core.data.thrift.UpdateErrors;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.iterators.IterationInterruptedException;
import org.apache.accumulo.core.master.thrift.Compacting;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -120,7 +117,6 @@ import org.apache.accumulo.core.tabletse
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
@@ -143,8 +139,8 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
-import org.apache.accumulo.server.logger.LogFileKey;
-import org.apache.accumulo.server.logger.LogFileValue;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.DistributedStoreException;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -182,7 +178,6 @@ import org.apache.accumulo.server.tablet
import org.apache.accumulo.server.tabletserver.metrics.TabletServerMinCMetrics;
import org.apache.accumulo.server.tabletserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.server.tabletserver.metrics.TabletServerUpdateMetrics;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.FileSystemMonitor;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.util.MapCounter;
@@ -205,14 +200,7 @@ import org.apache.accumulo.trace.instrum
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
import org.apache.commons.collections.map.LRUMap;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -246,7 +234,7 @@ public class TabletServer extends Abstra
super();
this.serverConfig = conf;
this.instance = conf.getInstance();
- this.fs = TraceFileSystem.wrap(fs);
+ this.fs = fs;
this.logSorter = new LogSorter(instance, fs, getSystemConfiguration());
SimpleTimer.getInstance().schedule(new Runnable() {
@Override
@@ -2091,13 +2079,12 @@ public class TabletServer extends Abstra
log.error("rename is unsuccessful");
} else {
log.info("Deleting walog " + filename);
- Trash trash = new Trash(fs, fs.getConf());
Path sourcePath = new Path(source);
- if (!trash.moveToTrash(sourcePath) && !fs.delete(sourcePath, true))
+ if (!fs.moveToTrash(sourcePath) && !fs.deleteRecursively(sourcePath))
log.warn("Failed to delete walog " + source);
Path recoveryPath = new Path(Constants.getRecoveryDir(acuConf), filename);
try {
- if (trash.moveToTrash(recoveryPath) || fs.delete(recoveryPath, true))
+ if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath))
log.info("Deleted any recovery log " + filename);
} catch (FileNotFoundException ex) {
// ignore
@@ -3220,13 +3207,11 @@ public class TabletServer extends Abstra
public static void main(String[] args) throws IOException {
try {
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ FileSystem fs = FileSystemImpl.get();
String hostname = Accumulo.getLocalAddress(args);
Instance instance = HdfsZooInstance.getInstance();
ServerConfiguration conf = new ServerConfiguration(instance);
Accumulo.init(fs, conf, "tserver");
- ensureHdfsSyncIsEnabled(fs);
- recoverLocalWriteAheadLogs(fs, conf);
TabletServer server = new TabletServer(conf, fs);
server.config(hostname);
Accumulo.enableTracing(hostname, "tserver");
@@ -3236,91 +3221,6 @@ public class TabletServer extends Abstra
}
}
- private static void ensureHdfsSyncIsEnabled(FileSystem fs) {
- if (fs instanceof DistributedFileSystem) {
- if (!fs.getConf().getBoolean("dfs.durable.sync", false) && !fs.getConf().getBoolean("dfs.support.append", false)) {
- String msg = "Must set dfs.durable.sync OR dfs.support.append to true. Which one needs to be set depends on your version of HDFS. See ACCUMULO-623. \n"
- + "HADOOP RELEASE VERSION SYNC NAME DEFAULT\n"
- + "Apache Hadoop 0.20.205 dfs.support.append false\n"
- + "Apache Hadoop 0.23.x dfs.support.append true\n"
- + "Apache Hadoop 1.0.x dfs.support.append false\n"
- + "Apache Hadoop 1.1.x dfs.durable.sync true\n"
- + "Apache Hadoop 2.0.0-2.0.2 dfs.support.append true\n"
- + "Cloudera CDH 3u0-3u3 ???? true\n"
- + "Cloudera CDH 3u4 dfs.support.append true\n"
- + "Hortonworks HDP `1.0 dfs.support.append false\n"
- + "Hortonworks HDP `1.1 dfs.support.append false";
- log.fatal(msg);
- System.exit(-1);
- }
- try {
- // if this class exists
- Class.forName("org.apache.hadoop.fs.CreateFlag");
- // we're running hadoop 2.0, 1.1
- if (!fs.getConf().getBoolean("dfs.datanode.synconclose", false)) {
- log.warn("dfs.datanode.synconclose set to false: data loss is possible on system reset or power loss");
- }
- } catch (ClassNotFoundException ex) {
- // hadoop 1.0
- }
- }
-
- }
-
- /**
- * Copy local walogs into HDFS on an upgrade
- *
- */
- public static void recoverLocalWriteAheadLogs(FileSystem fs, ServerConfiguration serverConf) throws IOException {
- FileSystem localfs = FileSystem.getLocal(fs.getConf()).getRawFileSystem();
- AccumuloConfiguration conf = serverConf.getConfiguration();
- String localWalDirectories = conf.get(Property.LOGGER_DIR);
- for (String localWalDirectory : localWalDirectories.split(",")) {
- if (!localWalDirectory.startsWith("/")) {
- localWalDirectory = System.getenv("ACCUMULO_HOME") + "/" + localWalDirectory;
- }
-
- FileStatus status = null;
- try {
- status = localfs.getFileStatus(new Path(localWalDirectory));
- } catch (FileNotFoundException fne) {}
-
- if (status == null || !status.isDir()) {
- log.debug("Local walog dir " + localWalDirectory + " not found ");
- continue;
- }
-
- for (FileStatus file : localfs.listStatus(new Path(localWalDirectory))) {
- String name = file.getPath().getName();
- try {
- UUID.fromString(name);
- } catch (IllegalArgumentException ex) {
- log.info("Ignoring non-log file " + name + " in " + localWalDirectory);
- continue;
- }
- LogFileKey key = new LogFileKey();
- LogFileValue value = new LogFileValue();
- log.info("Openning local log " + file.getPath());
- Reader reader = new SequenceFile.Reader(localfs, file.getPath(), localfs.getConf());
- Path tmp = new Path(Constants.getWalDirectory(conf) + "/" + name + ".copy");
- FSDataOutputStream writer = fs.create(tmp);
- while (reader.next(key, value)) {
- try {
- key.write(writer);
- value.write(writer);
- } catch (EOFException ex) {
- break;
- }
- }
- writer.close();
- reader.close();
- fs.rename(tmp, new Path(tmp.getParent(), name));
- log.info("Copied local log " + name);
- localfs.delete(new Path(localWalDirectory, name), true);
- }
- }
- }
-
public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException {
totalMinorCompactions++;
logger.minorCompactionFinished(tablet, newDatafile, walogSeq);
@@ -3330,7 +3230,7 @@ public class TabletServer extends Abstra
logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation);
}
- public void recover(Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
+ public void recover(FileSystem fs, Tablet tablet, List<LogEntry> logEntries, Set<String> tabletFiles, MutationReceiver mutationReceiver) throws IOException {
List<String> recoveryLogs = new ArrayList<String>();
List<LogEntry> sorted = new ArrayList<LogEntry>(logEntries);
Collections.sort(sorted, new Comparator<LogEntry>() {
@@ -3355,7 +3255,7 @@ public class TabletServer extends Abstra
throw new IOException("Unable to find recovery files for extent " + tablet.getExtent() + " logEntry: " + entry);
recoveryLogs.add(recovery);
}
- logger.recover(tablet, recoveryLogs, tabletFiles, mutationReceiver);
+ logger.recover(fs, tablet, recoveryLogs, tabletFiles, mutationReceiver);
}
private final AtomicInteger logIdGenerator = new AtomicInteger();
@@ -3561,5 +3461,9 @@ public class TabletServer extends Abstra
}
};
}
+
+ public FileSystem getFileSystem() {
+ return fs;
+ }
}
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Fri May 24 19:59:11 2013
@@ -48,12 +48,12 @@ import org.apache.accumulo.core.util.Met
import org.apache.accumulo.core.util.NamingThreadFactory;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
/**
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java Fri May 24 19:59:11 2013
@@ -29,7 +29,6 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,19 +43,14 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.StringUtil;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tabletserver.TabletMutations;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.fs.CreateFlag;
-//import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
/**
@@ -258,17 +252,14 @@ public class DfsLogger {
FileSystem fs = conf.getFileSystem();
short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
if (replication == 0)
- replication = fs.getDefaultReplication();
+ replication = fs.getDefaultReplication(logPath);
long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE);
if (blockSize == 0)
blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1);
- int checkSum = fs.getConf().getInt("io.bytes.per.checksum", 512);
- blockSize -= blockSize % checkSum;
- blockSize = Math.max(blockSize, checkSum);
if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC))
- logFile = create(fs, logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+ logFile = fs.createSyncable(logPath, 0, replication, blockSize);
else
- logFile = fs.create(logPath, true, fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
+ logFile = fs.create(logPath, true, 0, replication, blockSize);
try {
// sync: send data to datanodes
@@ -328,43 +319,6 @@ public class DfsLogger {
t.start();
}
- private FSDataOutputStream create(FileSystem fs, Path logPath, boolean b, int buffersize, short replication, long blockSize) throws IOException {
- try {
- // This...
- // EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
- // return fs.create(logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
- // Becomes this:
- Class<?> createFlags = Class.forName("org.apache.hadoop.fs.CreateFlag");
- List<Enum<?>> flags = new ArrayList<Enum<?>>();
- if (createFlags.isEnum()) {
- for (Object constant : createFlags.getEnumConstants()) {
- if (constant.toString().equals("SYNC_BLOCK")) {
- flags.add((Enum<?>)constant);
- log.debug("Found synch enum " + constant);
- }
- if (constant.toString().equals("CREATE")) {
- flags.add((Enum<?>)constant);
- log.debug("Found CREATE enum " + constant);
- }
- }
- }
- Object set = EnumSet.class.getMethod("of", java.lang.Enum.class, java.lang.Enum.class).invoke(null, flags.get(0), flags.get(1));
- log.debug("CreateFlag set: " + set);
- if (fs instanceof TraceFileSystem) {
- fs = ((TraceFileSystem)fs).getImplementation();
- }
- Method create = fs.getClass().getMethod("create", Path.class, FsPermission.class, EnumSet.class, Integer.TYPE, Short.TYPE, Long.TYPE, Progressable.class);
- log.debug("creating " + logPath + " with SYNCH_BLOCK flag");
- return (FSDataOutputStream)create.invoke(fs, logPath, FsPermission.getDefault(), set, buffersize, replication, blockSize, null);
- } catch (ClassNotFoundException ex) {
- // Expected in hadoop 1.0
- return fs.create(logPath, b, buffersize, replication, blockSize);
- } catch (Exception ex) {
- log.debug(ex, ex);
- return fs.create(logPath, b, buffersize, replication, blockSize);
- }
- }
-
/*
* (non-Javadoc)
*
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java?rev=1486202&r1=1486201&r2=1486202&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/tabletserver/log/LogSorter.java Fri May 24 19:59:11 2013
@@ -37,12 +37,12 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.fs.FileSystem;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.log4j.Logger;
@@ -104,7 +104,7 @@ public class LogSorter {
try {
// the following call does not throw an exception if the file/dir does not exist
- fs.delete(new Path(destPath), true);
+ fs.deleteRecursively(new Path(destPath));
FSDataInputStream tmpInput = fs.open(srcPath);
DataInputStream tmpDecryptingInput = tmpInput;
@@ -193,7 +193,8 @@ public class LogSorter {
private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey,LogFileValue>> buffer, int part) throws IOException {
String path = destPath + String.format("/part-r-%05d", part++);
- MapFile.Writer output = new MapFile.Writer(fs.getConf(), fs, path, LogFileKey.class, LogFileValue.class);
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+ MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path, LogFileKey.class, LogFileValue.class);
try {
Collections.sort(buffer, new Comparator<Pair<LogFileKey,LogFileValue>>() {
@Override