You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/06/19 22:18:32 UTC
svn commit: r1494759 [2/5] - in /accumulo/trunk:
core/src/main/java/org/apache/accumulo/core/client/admin/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/client/mock/
core/src/main/java/org/apache/a...
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Jun 19 20:18:30 2013
@@ -66,7 +66,6 @@ import org.apache.accumulo.core.data.Par
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.thrift.TKeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.MasterClientService;
@@ -84,7 +83,6 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
import org.apache.accumulo.core.util.ByteBufferUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -101,6 +99,9 @@ import org.apache.accumulo.server.Accumu
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.balancer.DefaultLoadBalancer;
import org.apache.accumulo.server.master.balancer.TabletBalancer;
@@ -144,7 +145,6 @@ import org.apache.accumulo.server.securi
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.AddressUtil;
import org.apache.accumulo.server.util.DefaultMap;
import org.apache.accumulo.server.util.Halt;
@@ -159,7 +159,6 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.trace.instrument.thrift.TraceWrap;
import org.apache.accumulo.trace.thrift.TInfo;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -193,7 +192,7 @@ public class Master implements LiveTServ
final private static int MAX_TSERVER_WORK_CHUNK = 5000;
final private static int MAX_BAD_STATUS_COUNT = 3;
- final private FileSystem fs;
+ final private VolumeManager fs;
final private Instance instance;
final private String hostname;
final private LiveTServerSet tserverSet;
@@ -452,10 +451,10 @@ public class Master implements LiveTServ
return instance;
}
- public Master(ServerConfiguration config, FileSystem fs, String hostname) throws IOException {
+ public Master(ServerConfiguration config, VolumeManager fs, String hostname) throws IOException {
this.serverConfig = config;
this.instance = config.getInstance();
- this.fs = TraceFileSystem.wrap(fs);
+ this.fs = fs;
this.hostname = hostname;
AccumuloConfiguration aconf = serverConfig.getConfiguration();
@@ -1575,11 +1574,11 @@ public class Master implements LiveTServ
MetadataTable.TIME_COLUMN.fetch(scanner);
scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
scanner.fetchColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY);
- Set<String> datafiles = new TreeSet<String>();
+ Set<FileRef> datafiles = new TreeSet<FileRef>();
for (Entry<Key,Value> entry : scanner) {
Key key = entry.getKey();
if (key.compareColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY) == 0) {
- datafiles.add(key.getColumnQualifier().toString());
+ datafiles.add(new FileRef(fs, key));
if (datafiles.size() > 1000) {
MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
datafiles.clear();
@@ -1589,7 +1588,7 @@ public class Master implements LiveTServ
} else if (key.compareColumnFamily(MetadataTable.CURRENT_LOCATION_COLUMN_FAMILY) == 0) {
throw new IllegalStateException("Tablet " + key.getRow() + " is assigned during a merge!");
} else if (MetadataTable.DIRECTORY_COLUMN.hasColumns(key)) {
- datafiles.add(entry.getValue().toString());
+ datafiles.add(new FileRef(fs, key));
if (datafiles.size() > 1000) {
MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
datafiles.clear();
@@ -2228,7 +2227,7 @@ public class Master implements LiveTServ
try {
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ VolumeManager fs = VolumeManagerImpl.get();
String hostname = Accumulo.getLocalAddress(args);
Instance instance = HdfsZooInstance.getInstance();
ServerConfiguration conf = new ServerConfiguration(instance);
@@ -2380,7 +2379,7 @@ public class Master implements LiveTServ
return serverConfig;
}
- public FileSystem getFileSystem() {
+ public VolumeManager getFileSystem() {
return this.fs;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/HadoopLogCloser.java Wed Jun 19 20:18:30 2013
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,10 +33,10 @@ public class HadoopLogCloser implements
private static Logger log = Logger.getLogger(HadoopLogCloser.class);
@Override
- public long close(Master master, FileSystem fs, Path source) throws IOException {
-
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem dfs = (DistributedFileSystem) fs;
+ public long close(Master master, VolumeManager fs, Path source) throws IOException {
+ FileSystem ns = fs.getFileSystemByPath(source);
+ if (ns instanceof DistributedFileSystem) {
+ DistributedFileSystem dfs = (DistributedFileSystem) ns;
try {
if (!dfs.recoverLease(source)) {
log.info("Waiting for file to be closed " + source.toString());
@@ -48,12 +49,12 @@ public class HadoopLogCloser implements
} catch (Exception ex) {
log.warn("Error recovery lease on " + source.toString(), ex);
}
- } else if (fs instanceof LocalFileSystem) {
+ } else if (ns instanceof LocalFileSystem) {
// ignore
} else {
throw new IllegalStateException("Don't know how to recover a lease for " + fs.getClass().getName());
}
- fs.append(source).close();
+ ns.append(source).close();
log.info("Recovered lease on " + source.toString() + " using append");
return 0;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/LogCloser.java Wed Jun 19 20:18:30 2013
@@ -19,9 +19,9 @@ package org.apache.accumulo.server.maste
import java.io.IOException;
import org.apache.accumulo.server.master.Master;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.Path;
public interface LogCloser {
- public long close(Master master, FileSystem fs, Path path) throws IOException;
+ public long close(Master master, VolumeManager fs, Path path) throws IOException;
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/MapRLogCloser.java Wed Jun 19 20:18:30 2013
@@ -19,6 +19,7 @@ package org.apache.accumulo.server.maste
import java.io.IOException;
import org.apache.accumulo.server.master.Master;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -29,11 +30,12 @@ public class MapRLogCloser implements Lo
private static Logger log = Logger.getLogger(MapRLogCloser.class);
@Override
- public long close(Master m, FileSystem fs, Path path) throws IOException {
+ public long close(Master m, VolumeManager fs, Path path) throws IOException {
log.info("Recovering file " + path.toString() + " by changing permission to readonly");
+ FileSystem ns = fs.getFileSystemByPath(path);
FsPermission roPerm = new FsPermission((short) 0444);
try {
- fs.setPermission(path, roPerm);
+ ns.setPermission(path, roPerm);
return 0;
} catch (IOException ex) {
log.error("error recovering lease ", ex);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/recovery/RecoveryManager.java Wed Jun 19 20:18:30 2013
@@ -36,10 +36,8 @@ import org.apache.accumulo.core.util.Nam
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.master.Master;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.server.zookeeper.ZooCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
@@ -68,40 +66,39 @@ public class RecoveryManager {
}
private class LogSortTask implements Runnable {
- private String filename;
- private String host;
+ private String source;
+ private String destination;
+ private String sortId;
private LogCloser closer;
- public LogSortTask(LogCloser closer, String host, String filename) {
+ public LogSortTask(LogCloser closer, String source, String destination, String sortId) {
this.closer = closer;
- this.host = host;
- this.filename = filename;
+ this.source = source;
+ this.destination = destination;
+ this.sortId = sortId;
}
@Override
public void run() {
boolean rescheduled = false;
try {
- FileSystem localFs = master.getFileSystem();
- if (localFs instanceof TraceFileSystem)
- localFs = ((TraceFileSystem) localFs).getImplementation();
- long time = closer.close(master, localFs, getSource(host, filename));
+ long time = closer.close(master, master.getFileSystem(), new Path(source));
if (time > 0) {
executor.schedule(this, time, TimeUnit.MILLISECONDS);
rescheduled = true;
} else {
- initiateSort(host, filename);
+ initiateSort(sortId, source, destination);
}
} catch (FileNotFoundException e) {
- log.debug("Unable to initate log sort for " + filename + ": " + e);
+ log.debug("Unable to initate log sort for " + source + ": " + e);
} catch (Exception e) {
- log.warn("Failed to initiate log sort " + filename, e);
+ log.warn("Failed to initiate log sort " + source, e);
} finally {
if (!rescheduled) {
synchronized (RecoveryManager.this) {
- closeTasksQueued.remove(filename);
+ closeTasksQueued.remove(sortId);
}
}
}
@@ -109,61 +106,57 @@ public class RecoveryManager {
}
- private void initiateSort(String host, final String file) throws KeeperException, InterruptedException {
- String source = getSource(host, file).toString();
- new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(file, source.getBytes());
+ private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException {
+ String work = source + "|" + destination;
+ new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes());
synchronized (this) {
- sortsQueued.add(file);
+ sortsQueued.add(sortId);
}
- final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + file;
- log.info("Created zookeeper entry " + path + " with data " + source);
- }
-
- private Path getSource(String server, String file) {
- String source = ServerConstants.getWalDirectory() + "/" + server + "/" + file;
- if (server.contains(":")) {
- // old-style logger log, copied from local file systems by tservers, unsorted into the wal base dir
- source = ServerConstants.getWalDirectory() + "/" + file;
- }
- return new Path(source);
+ final String path = ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId;
+ log.info("Created zookeeper entry " + path + " with data " + work);
}
public boolean recoverLogs(KeyExtent extent, Collection<Collection<String>> walogs) throws IOException {
boolean recoveryNeeded = false;
+ ;
for (Collection<String> logs : walogs) {
for (String walog : logs) {
- String parts[] = walog.split("/");
- String host = parts[0];
- String filename = parts[1];
+ String hostFilename[] = walog.split("/", 2);
+ String host = hostFilename[0];
+ String filename = hostFilename[1];
+ String parts[] = filename.split("/");
+ String sortId = parts[parts.length - 1];
+ String dest = master.getFileSystem().choose(ServerConstants.getRecoveryDirs()) + "/" + sortId;
+ log.debug("Recovering " + filename + " to " + dest);
boolean sortQueued;
synchronized (this) {
- sortQueued = sortsQueued.contains(filename);
+ sortQueued = sortsQueued.contains(sortId);
}
- if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + filename) == null) {
+ if (sortQueued && zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY + "/" + sortId) == null) {
synchronized (this) {
- sortsQueued.remove(filename);
+ sortsQueued.remove(sortId);
}
}
-
- if (master.getFileSystem().exists(new Path(ServerConstants.getRecoveryDir() + "/" + filename + "/finished"))) {
+
+ if (master.getFileSystem().exists(new Path(dest, "finished"))) {
synchronized (this) {
- closeTasksQueued.remove(filename);
- recoveryDelay.remove(filename);
- sortsQueued.remove(filename);
+ closeTasksQueued.remove(sortId);
+ recoveryDelay.remove(sortId);
+ sortsQueued.remove(sortId);
}
continue;
}
recoveryNeeded = true;
synchronized (this) {
- if (!closeTasksQueued.contains(filename) && !sortsQueued.contains(filename)) {
+ if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) {
AccumuloConfiguration aconf = master.getConfiguration().getConfiguration();
LogCloser closer = Master.createInstanceFromPropertyName(aconf, Property.MASTER_WALOG_CLOSER_IMPLEMETATION, LogCloser.class, new HadoopLogCloser());
- Long delay = recoveryDelay.get(filename);
+ Long delay = recoveryDelay.get(sortId);
if (delay == null) {
delay = master.getSystemConfiguration().getTimeInMillis(Property.MASTER_RECOVERY_DELAY);
} else {
@@ -172,9 +165,9 @@ public class RecoveryManager {
log.info("Starting recovery of " + filename + " (in : " + (delay / 1000) + "s) created for " + host + ", tablet " + extent + " holds a reference");
- executor.schedule(new LogSortTask(closer, host, filename), delay, TimeUnit.MILLISECONDS);
- closeTasksQueued.add(filename);
- recoveryDelay.put(filename, delay);
+ executor.schedule(new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS);
+ closeTasksQueued.add(sortId);
+ recoveryDelay.put(sortId, delay);
}
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/state/SetGoalState.java Wed Jun 19 20:18:30 2013
@@ -17,17 +17,15 @@
package org.apache.accumulo.server.master.state;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.thrift.MasterGoalState;
import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.FileSystem;
public class SetGoalState {
@@ -41,7 +39,7 @@ public class SetGoalState {
}
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ VolumeManager fs = VolumeManagerImpl.get();
Accumulo.waitForZookeeperAndHdfs(fs);
ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(HdfsZooInstance.getInstance()) + Constants.ZMASTER_GOAL_STATE, args[0].getBytes(),
NodeExistsPolicy.OVERWRITE);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Wed Jun 19 20:18:30 2013
@@ -62,6 +62,7 @@ import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -75,7 +76,6 @@ import org.apache.accumulo.trace.instrum
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
@@ -146,7 +146,7 @@ public class BulkImport extends MasterRe
Utils.getReadLock(tableId, tid).lock();
// check that the error directory exists and is empty
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
Path errorPath = new Path(errorDir);
FileStatus errorStatus = null;
@@ -179,8 +179,23 @@ public class BulkImport extends MasterRe
}
}
- private Path createNewBulkDir(FileSystem fs, String tableId) throws IOException {
- Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableId);
+ private Path createNewBulkDir(VolumeManager fs, String tableId) throws IOException {
+ String tableDir = null;
+ loop:
+ for (String dir : fs.getFileSystems().keySet()) {
+ if (this.sourceDir.startsWith(dir)) {
+ for (String path : ServerConstants.getTablesDirs()) {
+ if (path.startsWith(dir)) {
+ tableDir = path;
+ break loop;
+ }
+ }
+ break;
+ }
+ }
+ if (tableDir == null)
+ throw new IllegalStateException(sourceDir + " is not in a known namespace");
+ Path directory = new Path(tableDir + "/" + tableId);
fs.mkdirs(directory);
// only one should be able to create the lock file
@@ -203,7 +218,7 @@ public class BulkImport extends MasterRe
}
}
- private String prepareBulkImport(FileSystem fs, String dir, String tableId) throws IOException {
+ private String prepareBulkImport(VolumeManager fs, String dir, String tableId) throws IOException {
Path bulkDir = createNewBulkDir(fs, tableId);
MetadataTable.addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
@@ -369,7 +384,7 @@ class CopyFailed extends MasterRepo {
public Repo<Master> call(long tid, Master master) throws Exception {
// This needs to execute after the arbiter is stopped
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT)))
return new CleanUpBulkImport(tableId, source, bulk, error);
@@ -440,7 +455,7 @@ class CopyFailed extends MasterRepo {
bifCopyQueue.waitUntilDone(workIds);
}
- fs.delete(new Path(error, BulkImport.FAILURES_TXT), true);
+ fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT));
return new CleanUpBulkImport(tableId, source, bulk, error);
}
@@ -490,7 +505,7 @@ class LoadFiles extends MasterRepo {
public Repo<Master> call(final long tid, final Master master) throws Exception {
initializeThreadPool(master);
final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration();
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
List<FileStatus> files = new ArrayList<FileStatus>();
for (FileStatus entry : fs.listStatus(new Path(bulk))) {
files.add(entry);
@@ -500,12 +515,12 @@ class LoadFiles extends MasterRepo {
Path writable = new Path(this.errorDir, ".iswritable");
if (!fs.createNewFile(writable)) {
// Maybe this is a re-try... clear the flag and try again
- fs.delete(writable, false);
+ fs.delete(writable);
if (!fs.createNewFile(writable))
throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY,
"Unable to write to " + this.errorDir);
}
- fs.delete(writable, false);
+ fs.delete(writable);
final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>());
for (FileStatus f : files)
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/CreateTable.java Wed Jun 19 20:18:30 2013
@@ -27,25 +27,21 @@ import org.apache.accumulo.core.client.i
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.tabletserver.TabletTime;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.accumulo.server.util.TabletOperations;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -148,18 +144,19 @@ class CreateDir extends MasterRepo {
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
- String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
- TabletOperations.createTabletDirectory(fs, dir, null);
+ public Repo<Master> call(long tid, Master master) throws Exception {
+ VolumeManager fs = master.getFileSystem();
+ TabletOperations.createTabletDirectory(fs, tableInfo.tableId, null);
return new PopulateMetadata(tableInfo);
}
@Override
- public void undo(long tid, Master environment) throws Exception {
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
- String dir = ServerConstants.getTablesDir() + "/" + tableInfo.tableId;
- fs.delete(new Path(dir), true);
+ public void undo(long tid, Master master) throws Exception {
+ VolumeManager fs = master.getFileSystem();
+ for(String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir + "/" + tableInfo.tableId));
+ }
+
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/DeleteTable.java Wed Jun 19 20:18:30 2013
@@ -34,9 +34,9 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.iterators.user.GrepIterator;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MetaDataTableScanner;
import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -46,7 +46,6 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -168,8 +167,10 @@ class CleanUp extends MasterRepo {
if (refCount == 0) {
// delete the map files
try {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- fs.delete(new Path(ServerConstants.getTablesDir(), tableId), true);
+ VolumeManager fs = master.getFileSystem();
+ for (String dir : ServerConstants.getTablesDirs()) {
+ fs.deleteRecursively(new Path(dir, tableId));
+ }
} catch (IOException e) {
log.error("Unable to remove deleted table directory", e);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ExportTable.java Wed Jun 19 20:18:30 2013
@@ -22,7 +22,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
-import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -52,9 +51,9 @@ import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -141,7 +140,7 @@ class WriteExportFiles extends MasterRep
Utils.unreserveTable(tableInfo.tableID, tid, false);
}
- public static void exportTable(FileSystem fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
+ public static void exportTable(VolumeManager fs, Connector conn, String tableName, String tableID, String exportDir) throws Exception {
fs.mkdirs(new Path(exportDir));
@@ -171,7 +170,7 @@ class WriteExportFiles extends MasterRep
exportConfig(conn, tableID, zipOut, dataOut);
dataOut.flush();
- Map<String,String> uniqueFiles = exportMetadata(conn, tableID, zipOut, dataOut);
+ Map<String,String> uniqueFiles = exportMetadata(fs, conn, tableID, zipOut, dataOut);
dataOut.close();
dataOut = null;
@@ -184,24 +183,16 @@ class WriteExportFiles extends MasterRep
}
}
- private static void createDistcpFile(FileSystem fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
+ private static void createDistcpFile(VolumeManager fs, String exportDir, Path exportMetaFilePath, Map<String,String> uniqueFiles) throws IOException {
BufferedWriter distcpOut = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(exportDir, "distcp.txt"), false)));
try {
- URI uri = fs.getUri();
-
- for (String relPath : uniqueFiles.values()) {
- Path absPath = new Path(uri.getScheme(), uri.getAuthority(), ServerConstants.getTablesDir() + relPath);
- distcpOut.append(absPath.toUri().toString());
+ for (String file : uniqueFiles.values()) {
+ distcpOut.append(file);
distcpOut.newLine();
}
- Path absEMP = exportMetaFilePath;
- if (!exportMetaFilePath.isAbsolute())
- absEMP = new Path(fs.getWorkingDirectory().toUri().getPath(), exportMetaFilePath);
-
- distcpOut.append(new Path(uri.getScheme(), uri.getAuthority(), absEMP.toString()).toUri().toString());
-
+ distcpOut.append(exportMetaFilePath.toString());
distcpOut.newLine();
distcpOut.close();
@@ -213,7 +204,7 @@ class WriteExportFiles extends MasterRep
}
}
- private static Map<String,String> exportMetadata(Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
+ private static Map<String,String> exportMetadata(VolumeManager fs, Connector conn, String tableID, ZipOutputStream zipOut, DataOutputStream dataOut) throws IOException,
TableNotFoundException {
zipOut.putNextEntry(new ZipEntry(Constants.EXPORT_METADATA_FILE));
@@ -230,24 +221,18 @@ class WriteExportFiles extends MasterRep
entry.getValue().write(dataOut);
if (entry.getKey().getColumnFamily().equals(MetadataTable.DATAFILE_COLUMN_FAMILY)) {
- String relPath = entry.getKey().getColumnQualifierData().toString();
-
- if (relPath.startsWith("../"))
- relPath = relPath.substring(2);
- else
- relPath = "/" + tableID + relPath;
-
- String tokens[] = relPath.split("/");
- if (tokens.length != 4) {
- throw new RuntimeException("Illegal path " + relPath);
+ String path = fs.getFullPath(entry.getKey()).toString();
+ String tokens[] = path.split("/");
+ if (tokens.length < 1) {
+ throw new RuntimeException("Illegal path " + path);
}
- String filename = tokens[3];
+ String filename = tokens[tokens.length - 1];
String existingPath = uniqueFiles.get(filename);
if (existingPath == null) {
- uniqueFiles.put(filename, relPath);
- } else if (!existingPath.equals(relPath)) {
+ uniqueFiles.put(filename, path);
+ } else if (!existingPath.equals(path)) {
// make sure file names are unique, should only apply for tables with file names generated by Accumulo 1.3 and earlier
throw new IOException("Cannot export table with nonunique file names " + filename + ". Major compact table.");
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java Wed Jun 19 20:18:30 2013
@@ -52,6 +52,7 @@ import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.tables.TableManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -98,7 +99,7 @@ class FinishImportTable extends MasterRe
@Override
public Repo<Master> call(long tid, Master env) throws Exception {
- env.getFileSystem().delete(new Path(tableInfo.importDir, "mappings.txt"), true);
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir, "mappings.txt"));
TableManager.getInstance().transitionTableState(tableInfo.tableId, TableState.ONLINE);
@@ -136,7 +137,7 @@ class MoveExportedFiles extends MasterRe
@Override
public Repo<Master> call(long tid, Master master) throws Exception {
try {
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo);
@@ -175,7 +176,7 @@ class PopulateMetadataTable extends Mast
this.tableInfo = ti;
}
- static Map<String,String> readMappingFile(FileSystem fs, ImportedTableInfo tableInfo) throws Exception {
+ static Map<String,String> readMappingFile(VolumeManager fs, ImportedTableInfo tableInfo) throws Exception {
BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(tableInfo.importDir, "mappings.txt"))));
try {
@@ -203,7 +204,7 @@ class PopulateMetadataTable extends Mast
ZipInputStream zis = null;
try {
- FileSystem fs = master.getFileSystem();
+ VolumeManager fs = master.getFileSystem();
mbw = master.getConnector().createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
@@ -311,10 +312,10 @@ class MapImportFileNames extends MasterR
BufferedWriter mappingsWriter = null;
try {
- FileSystem fs = environment.getFileSystem();
+ VolumeManager fs = environment.getFileSystem();
fs.mkdirs(new Path(tableInfo.importDir));
-
+
FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
@@ -323,7 +324,7 @@ class MapImportFileNames extends MasterR
for (FileStatus fileStatus : files) {
String fileName = fileStatus.getPath().getName();
-
+ log.info("filename " + fileStatus.getPath().toString());
String sa[] = fileName.split("\\.");
String extension = "";
if (sa.length > 1) {
@@ -365,7 +366,7 @@ class MapImportFileNames extends MasterR
@Override
public void undo(long tid, Master env) throws Exception {
- env.getFileSystem().delete(new Path(tableInfo.importDir), true);
+ env.getFileSystem().deleteRecursively(new Path(tableInfo.importDir));
}
}
@@ -380,11 +381,12 @@ class CreateImportDir extends MasterRepo
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
+ public Repo<Master> call(long tid, Master master) throws Exception {
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
- Path directory = new Path(ServerConstants.getTablesDir() + "/" + tableInfo.tableId);
+ Path base = master.getFileSystem().matchingFileSystem(new Path(tableInfo.exportDir), ServerConstants.getTablesDirs());
+ Path directory = new Path(base, tableInfo.tableId);
Path newBulkDir = new Path(directory, Constants.BULK_PREFIX + namer.getNextName());
@@ -409,12 +411,13 @@ class ImportPopulateZookeeper extends Ma
return Utils.reserveTable(tableInfo.tableId, tid, true, false, TableOperation.IMPORT);
}
- private Map<String,String> getExportedProps(FileSystem fs) throws Exception {
+ private Map<String,String> getExportedProps(VolumeManager fs) throws Exception {
Path path = new Path(tableInfo.exportDir, Constants.EXPORT_FILE);
try {
- return TableOperationsImpl.getExportedProps(fs, path);
+ FileSystem ns = fs.getFileSystemByPath(path);
+ return TableOperationsImpl.getExportedProps(ns, path);
} catch (IOException ioe) {
throw new ThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, TableOperation.IMPORT, TableOperationExceptionType.OTHER,
"Error reading table props from " + path + " " + ioe.getMessage());
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java Wed Jun 19 20:18:30 2013
@@ -16,24 +16,28 @@
*/
package org.apache.accumulo.server.master.tableOps;
-import org.apache.accumulo.core.Constants;
+import java.util.Map.Entry;
+
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.fate.Repo;
-import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.master.Master;
import org.apache.accumulo.server.master.state.MergeInfo;
import org.apache.accumulo.server.master.state.MergeInfo.Operation;
import org.apache.accumulo.server.master.state.MergeState;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
/**
@@ -58,13 +62,14 @@ class MakeDeleteEntries extends MasterRe
public Repo<Master> call(long tid, Master master) throws Exception {
log.info("creating delete entries for merged metadata tablets");
Connector conn = master.getConnector();
+ Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.setRange(RootTable.KEYSPACE);
+ scanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- String tableDir = ServerConstants.getMetadataTableDir();
- for (FileStatus fs : master.getFileSystem().listStatus(new Path(tableDir))) {
+ for (Entry<Key,Value> entry : scanner) {
// TODO: add the entries only if there are no !METADATA table references - ACCUMULO-1308
- if (fs.isDir() && fs.getPath().getName().matches("^" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + ".*")) {
- bw.addMutation(MetadataTable.createDeleteMutation(MetadataTable.ID, "/" + fs.getPath().getName()));
- }
+ FileRef ref = new FileRef(master.getFileSystem(), entry.getKey());
+ bw.addMutation(MetadataTable.createDeleteMutation(MetadataTable.ID, ref.path().toString()));
}
bw.close();
return null;
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/Monitor.java Wed Jun 19 20:18:30 2013
@@ -27,13 +27,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.MasterClient;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.gc.thrift.GCMonitorService;
import org.apache.accumulo.core.gc.thrift.GCStatus;
import org.apache.accumulo.core.master.thrift.Compacting;
@@ -42,7 +40,6 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.security.SecurityUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.core.util.LoggingRunnable;
import org.apache.accumulo.core.util.Pair;
@@ -54,6 +51,8 @@ import org.apache.accumulo.core.zookeepe
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.monitor.servlets.DefaultServlet;
import org.apache.accumulo.server.monitor.servlets.GcStatusServlet;
import org.apache.accumulo.server.monitor.servlets.JSONServlet;
@@ -73,7 +72,7 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.EmbeddedWebServer;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -450,7 +449,7 @@ public class Monitor {
public static void main(String[] args) throws Exception {
SecurityUtil.serverLogin();
- FileSystem fs = FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration());
+ VolumeManager fs = VolumeManagerImpl.get();
String hostname = Accumulo.getLocalAddress(args);
instance = HdfsZooInstance.getInstance();
config = new ServerConfiguration(instance);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java Wed Jun 19 20:18:30 2013
@@ -28,8 +28,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
@@ -55,15 +53,18 @@ import org.apache.accumulo.core.util.Loc
import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.tabletserver.Tablet.MajorCompactionReason;
import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
+import org.apache.accumulo.trace.instrument.Span;
+import org.apache.accumulo.trace.instrument.Trace;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -118,14 +119,14 @@ public class Compactor implements Callab
IteratorScope getIteratorScope();
}
- private Map<String,DataFileValue> filesToCompact;
+ private Map<FileRef,DataFileValue> filesToCompact;
private InMemoryMap imm;
- private String outputFile;
+ private FileRef outputFile;
private boolean propogateDeletes;
private TableConfiguration acuTableConf;
private CompactionEnv env;
private Configuration conf;
- private FileSystem fs;
+ private VolumeManager fs;
protected KeyExtent extent;
private List<IteratorSetting> iterators;
@@ -217,9 +218,10 @@ public class Compactor implements Callab
iiList.add(new IterInfo(iterSetting.getPriority(), iterSetting.getIteratorClass(), iterSetting.getName()));
iterOptions.put(iterSetting.getName(), iterSetting.getOptions());
}
-
- return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, new ArrayList<String>(
- compactor.filesToCompact.keySet()), compactor.outputFile, type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
+ List<String> filesToCompact = new ArrayList<String>();
+ for (FileRef ref : compactor.filesToCompact.keySet())
+ filesToCompact.add(ref.toString());
+ return new ActiveCompaction(compactor.extent.toThrift(), System.currentTimeMillis() - compactor.startTime, filesToCompact, compactor.outputFile.toString(), type, reason, localityGroup, entriesRead, entriesWritten, iiList, iterOptions);
}
}
@@ -235,7 +237,7 @@ public class Compactor implements Callab
return compactions;
}
- Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+ Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting> iterators, MajorCompactionReason reason) {
this.extent = extent;
this.conf = conf;
@@ -252,12 +254,12 @@ public class Compactor implements Callab
startTime = System.currentTimeMillis();
}
- Compactor(Configuration conf, FileSystem fs, Map<String,DataFileValue> files, InMemoryMap imm, String outputFile, boolean propogateDeletes,
+ Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files, InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new ArrayList<IteratorSetting>(), null);
}
- public FileSystem getFileSystem() {
+ public VolumeManager getFileSystem() {
return fs;
}
@@ -266,7 +268,7 @@ public class Compactor implements Callab
}
String getOutputFile() {
- return outputFile;
+ return outputFile.toString();
}
@Override
@@ -282,7 +284,8 @@ public class Compactor implements Callab
try {
FileOperations fileFactory = FileOperations.getInstance();
- mfw = fileFactory.openWriter(outputFile, fs, conf, acuTableConf);
+ FileSystem ns = this.fs.getFileSystemByPath(outputFile.path());
+ mfw = fileFactory.openWriter(outputFile.path().toString(), ns, ns.getConf(), acuTableConf);
Map<String,Set<ByteSequence>> lGroups;
try {
@@ -314,7 +317,7 @@ public class Compactor implements Callab
// Verify the file, since hadoop 0.20.2 sometimes lies about the success of close()
try {
- FileSKVIterator openReader = fileFactory.openReader(outputFile, false, fs, conf, acuTableConf);
+ FileSKVIterator openReader = fileFactory.openReader(outputFile.path().toString(), false, ns, ns.getConf(), acuTableConf);
openReader.close();
} catch (IOException ex) {
log.error("Verification of successful compaction fails!!! " + extent + " " + outputFile, ex);
@@ -324,7 +327,7 @@ public class Compactor implements Callab
log.debug(String.format("Compaction %s %,d read | %,d written | %,6d entries/sec | %6.3f secs", extent, majCStats.getEntriesRead(),
majCStats.getEntriesWritten(), (int) (majCStats.getEntriesRead() / ((t2 - t1) / 1000.0)), (t2 - t1) / 1000.0));
- majCStats.setFileSize(fileFactory.getFileSize(outputFile, fs, conf, acuTableConf));
+ majCStats.setFileSize(fileFactory.getFileSize(outputFile.path().toString(), ns, ns.getConf(), acuTableConf));
return majCStats;
} catch (IOException e) {
log.error(e, e);
@@ -343,9 +346,8 @@ public class Compactor implements Callab
try {
mfw.close();
} finally {
- Path path = new Path(outputFile);
- if (!fs.delete(path, true))
- if (fs.exists(path))
+ if (!fs.deleteRecursively(outputFile.path()))
+ if (fs.exists(outputFile.path()))
log.error("Unable to delete " + outputFile);
}
}
@@ -359,18 +361,18 @@ public class Compactor implements Callab
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(filesToCompact.size());
- for (String mapFile : filesToCompact.keySet()) {
+ for (FileRef mapFile : filesToCompact.keySet()) {
try {
FileOperations fileFactory = FileOperations.getInstance();
-
+ FileSystem fs = this.fs.getFileSystemByPath(mapFile.path());
FileSKVIterator reader;
- reader = fileFactory.openReader(mapFile, false, fs, conf, acuTableConf);
+ reader = fileFactory.openReader(mapFile.path().toString(), false, fs, conf, acuTableConf);
readers.add(reader);
- SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile, false, reader);
+ SortedKeyValueIterator<Key,Value> iter = new ProblemReportingIterator(extent.getTableId().toString(), mapFile.path().toString(), false, reader);
if (filesToCompact.get(mapFile).isTimeSet()) {
iter = new TimeSettingIterator(iter, filesToCompact.get(mapFile).getTime());
@@ -380,7 +382,7 @@ public class Compactor implements Callab
} catch (Throwable e) {
- ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile, e));
+ ProblemReports.getInstance().report(new ProblemReport(extent.getTableId().toString(), ProblemType.FILE_READ, mapFile.path().toString(), e));
log.warn("Some problem opening map file " + mapFile + " " + e.getMessage(), e);
// failed to open some map file... close the ones that were opened
@@ -462,7 +464,7 @@ public class Compactor implements Callab
} catch (IOException e) {
log.error(e, e);
}
- fs.delete(new Path(outputFile), true);
+ fs.deleteRecursively(outputFile.path());
} catch (Exception e) {
log.warn("Failed to delete Canceled compaction output file " + outputFile, e);
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/FileManager.java Wed Jun 19 20:18:30 2013
@@ -42,13 +42,17 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource;
import org.apache.accumulo.core.iterators.system.TimeSettingIterator;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
import org.apache.accumulo.server.problems.ProblemReports;
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -99,7 +103,7 @@ public class FileManager {
private Semaphore filePermits;
- private FileSystem fs;
+ private VolumeManager fs;
// the data cache and index cache are allocated in
// TabletResourceManager and passed through the file opener to
@@ -158,7 +162,7 @@ public class FileManager {
* @param indexCache
* : underlying file can and should be able to handle a null cache
*/
- FileManager(ServerConfiguration conf, FileSystem fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
+ FileManager(ServerConfiguration conf, VolumeManager fs, int maxOpen, BlockCache dataCache, BlockCache indexCache) {
if (maxOpen <= 0)
throw new IllegalArgumentException("maxOpen <= 0");
@@ -239,8 +243,7 @@ public class FileManager {
}
private List<String> takeOpenFiles(Collection<String> files, List<FileSKVIterator> reservedFiles, Map<FileSKVIterator,String> readersReserved) {
- List<String> filesToOpen;
- filesToOpen = new LinkedList<String>(files);
+ List<String> filesToOpen = new LinkedList<String>(files);
for (Iterator<String> iterator = filesToOpen.iterator(); iterator.hasNext();) {
String file = iterator.next();
@@ -304,8 +307,10 @@ public class FileManager {
// open any files that need to be opened
for (String file : filesToOpen) {
try {
- // log.debug("Opening "+file);
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, fs.getConf(), conf.getTableConfiguration(table.toString()),
+ Path path = fs.getFullPath(ServerConstants.getTablesDirs(), file);
+ FileSystem ns = fs.getFileSystemByPath(path);
+ //log.debug("Opening "+file + " path " + path);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), conf.getTableConfiguration(table.toString()),
dataCache, indexCache);
reservedFiles.add(reader);
readersReserved.put(reader, file);
@@ -453,6 +458,13 @@ public class FileManager {
}
}
+ private List<FileSKVIterator> openFileRefs(Collection<FileRef> files) throws TooManyFilesException, IOException {
+ List<String> strings = new ArrayList<String>(files.size());
+ for (FileRef ref : files)
+ strings.add(ref.path().toString());
+ return openFiles(strings);
+ }
+
private List<FileSKVIterator> openFiles(Collection<String> files) throws TooManyFilesException, IOException {
// one tablet can not open more than maxOpen files, otherwise it could get stuck
// forever waiting on itself to release files
@@ -468,9 +480,9 @@ public class FileManager {
return newlyReservedReaders;
}
- synchronized List<InterruptibleIterator> openFiles(Map<String,DataFileValue> files, boolean detachable) throws IOException {
+ synchronized List<InterruptibleIterator> openFiles(Map<FileRef,DataFileValue> files, boolean detachable) throws IOException {
- List<FileSKVIterator> newlyReservedReaders = openFiles(files.keySet());
+ List<FileSKVIterator> newlyReservedReaders = openFileRefs(files.keySet());
ArrayList<InterruptibleIterator> iters = new ArrayList<InterruptibleIterator>();
@@ -485,9 +497,9 @@ public class FileManager {
} else {
iter = new ProblemReportingIterator(tablet.getTableId().toString(), filename, continueOnFailure, reader);
}
-
- if (files.get(filename).isTimeSet()) {
- iter = new TimeSettingIterator(iter, files.get(filename).getTime());
+ DataFileValue value = files.get(new FileRef(filename));
+ if (value.isTimeSet()) {
+ iter = new TimeSettingIterator(iter, value.getTime());
}
iters.add(iter);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/MinorCompactor.java Wed Jun 19 20:18:30 2013
@@ -34,7 +34,8 @@ import org.apache.accumulo.server.proble
import org.apache.accumulo.server.problems.ProblemType;
import org.apache.accumulo.server.tabletserver.Tablet.MinorCompactionReason;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -42,16 +43,16 @@ public class MinorCompactor extends Comp
private static final Logger log = Logger.getLogger(MinorCompactor.class);
- private static final Map<String,DataFileValue> EMPTY_MAP = Collections.emptyMap();
+ private static final Map<FileRef,DataFileValue> EMPTY_MAP = Collections.emptyMap();
- private static Map<String,DataFileValue> toFileMap(String mergeFile, DataFileValue dfv) {
+ private static Map<FileRef,DataFileValue> toFileMap(FileRef mergeFile, DataFileValue dfv) {
if (mergeFile == null)
return EMPTY_MAP;
return Collections.singletonMap(mergeFile, dfv);
}
- MinorCompactor(Configuration conf, FileSystem fs, InMemoryMap imm, String mergeFile, DataFileValue dfv, String outputFile, TableConfiguration acuTableConf,
+ MinorCompactor(Configuration conf, VolumeManager fs, InMemoryMap imm, FileRef mergeFile, DataFileValue dfv, FileRef outputFile, TableConfiguration acuTableConf,
KeyExtent extent, MinorCompactionReason mincReason) {
super(conf, fs, toFileMap(mergeFile, dfv), imm, outputFile, true, acuTableConf, extent, new CompactionEnv() {
@@ -126,7 +127,7 @@ public class MinorCompactor extends Comp
// clean up
try {
if (getFileSystem().exists(new Path(getOutputFile()))) {
- getFileSystem().delete(new Path(getOutputFile()), true);
+ getFileSystem().deleteRecursively(new Path(getOutputFile()));
}
} catch (IOException e) {
log.warn("Failed to delete failed MinC file " + getOutputFile() + " " + e.getMessage());