You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/06/19 22:18:32 UTC
svn commit: r1494759 [5/5] - in /accumulo/trunk:
core/src/main/java/org/apache/accumulo/core/client/admin/
core/src/main/java/org/apache/accumulo/core/client/impl/
core/src/main/java/org/apache/accumulo/core/client/mock/
core/src/main/java/org/apache/a...
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Wed Jun 19 20:18:30 2013
@@ -52,12 +52,11 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.server.util.FileUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.core.util.Pair;
@@ -70,14 +69,14 @@ import org.apache.accumulo.fate.zookeepe
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -151,8 +150,8 @@ public class MetadataTable extends org.a
* @param flushId
*
*/
- public static void updateTabletDataFile(KeyExtent extent, String path, String mergeFile, DataFileValue dfv, String time, TCredentials credentials,
- Set<String> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
+ public static void updateTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, TCredentials credentials,
+ Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
if (extent.equals(RootTable.ROOT_TABLET_EXTENT)) {
if (unusedWalLogs != null) {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
@@ -163,7 +162,7 @@ public class MetadataTable extends org.a
boolean foundEntry = false;
for (String entry : unusedWalLogs) {
String[] parts = entry.split("/");
- String zpath = root + "/" + parts[1];
+ String zpath = root + "/" + parts[parts.length - 1];
while (true) {
try {
if (zk.exists(zpath)) {
@@ -188,7 +187,7 @@ public class MetadataTable extends org.a
Mutation m = new Mutation(extent.getMetadataEntry());
if (dfv.getNumEntries() > 0) {
- m.put(DATAFILE_COLUMN_FAMILY, new Text(path), new Value(dfv.encode()));
+ m.put(DATAFILE_COLUMN_FAMILY, path.meta(), new Value(dfv.encode()));
TIME_COLUMN.put(m, new Value(time.getBytes()));
// stuff in this location
TServerInstance self = getTServerInstance(address, zooLock);
@@ -203,11 +202,11 @@ public class MetadataTable extends org.a
}
}
- for (String scanFile : filesInUseByScans)
- m.put(SCANFILE_COLUMN_FAMILY, new Text(scanFile), new Value("".getBytes()));
+ for (FileRef scanFile : filesInUseByScans)
+ m.put(SCANFILE_COLUMN_FAMILY, scanFile.meta(), new Value("".getBytes()));
if (mergeFile != null)
- m.putDelete(DATAFILE_COLUMN_FAMILY, new Text(mergeFile));
+ m.putDelete(DATAFILE_COLUMN_FAMILY, mergeFile.meta());
FLUSH_COLUMN.put(m, new Value((flushId + "").getBytes()));
@@ -244,12 +243,12 @@ public class MetadataTable extends org.a
}
}
- public static void updateTabletDataFile(long tid, KeyExtent extent, Map<String,DataFileValue> estSizes, String time, TCredentials credentials, ZooLock zooLock) {
+ public static void updateTabletDataFile(long tid, KeyExtent extent, Map<FileRef,DataFileValue> estSizes, String time, TCredentials credentials, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
byte[] tidBytes = Long.toString(tid).getBytes();
- for (Entry<String,DataFileValue> entry : estSizes.entrySet()) {
- Text file = new Text(entry.getKey());
+ for (Entry<FileRef,DataFileValue> entry : estSizes.entrySet()) {
+ Text file = entry.getKey().meta();
m.put(DATAFILE_COLUMN_FAMILY, file, new Value(entry.getValue().encode()));
m.put(BULKFILE_COLUMN_FAMILY, file, new Value(tidBytes));
}
@@ -336,12 +335,13 @@ public class MetadataTable extends org.a
return false;
}
- public static SortedMap<String,DataFileValue> getDataFileSizes(KeyExtent extent, TCredentials credentials) {
- TreeMap<String,DataFileValue> sizes = new TreeMap<String,DataFileValue>();
+ public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, TCredentials credentials) throws IOException {
+ TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
Scanner mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, ID, Authorizations.EMPTY);
mdScanner.fetchColumnFamily(DATAFILE_COLUMN_FAMILY);
Text row = extent.getMetadataEntry();
+ VolumeManager fs = VolumeManagerImpl.get();
Key endKey = new Key(row, DATAFILE_COLUMN_FAMILY, new Text(""));
endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
@@ -352,14 +352,14 @@ public class MetadataTable extends org.a
if (!entry.getKey().getRow().equals(row))
break;
DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(entry.getKey().getColumnQualifier().toString(), dfv);
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
}
return sizes;
}
- public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<String,DataFileValue> datafileSizes,
- Map<String,Long> bulkLoadedFiles, TCredentials credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
+ public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<FileRef,DataFileValue> datafileSizes,
+ Map<FileRef,Long> bulkLoadedFiles, TCredentials credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
Mutation m = extent.getPrevRowUpdateMutation();
DIRECTORY_COLUMN.put(m, new Value(path.getBytes()));
@@ -374,13 +374,13 @@ public class MetadataTable extends org.a
m.putDelete(FUTURE_LOCATION_COLUMN_FAMILY, location.asColumnQualifier());
}
- for (Entry<String,DataFileValue> entry : datafileSizes.entrySet()) {
- m.put(DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ m.put(DATAFILE_COLUMN_FAMILY, entry.getKey().meta(), new Value(entry.getValue().encode()));
}
- for (Entry<String,Long> entry : bulkLoadedFiles.entrySet()) {
+ for (Entry<FileRef,Long> entry : bulkLoadedFiles.entrySet()) {
byte[] tidBytes = Long.toString(entry.getValue()).getBytes();
- m.put(BULKFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(tidBytes));
+ m.put(BULKFILE_COLUMN_FAMILY, entry.getKey().meta(), new Value(tidBytes));
}
update(credentials, zooLock, m);
@@ -404,36 +404,36 @@ public class MetadataTable extends org.a
update(credentials, zooLock, m);
}
- public static void finishSplit(Text metadataEntry, Map<String,DataFileValue> datafileSizes, List<String> highDatafilesToRemove, TCredentials credentials,
+ public static void finishSplit(Text metadataEntry, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, TCredentials credentials,
ZooLock zooLock) {
Mutation m = new Mutation(metadataEntry);
SPLIT_RATIO_COLUMN.putDelete(m);
OLD_PREV_ROW_COLUMN.putDelete(m);
CHOPPED_COLUMN.putDelete(m);
- for (Entry<String,DataFileValue> entry : datafileSizes.entrySet()) {
- m.put(DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ m.put(DATAFILE_COLUMN_FAMILY, entry.getKey().meta(), new Value(entry.getValue().encode()));
}
- for (String pathToRemove : highDatafilesToRemove) {
- m.putDelete(DATAFILE_COLUMN_FAMILY, new Text(pathToRemove));
+ for (FileRef pathToRemove : highDatafilesToRemove) {
+ m.putDelete(DATAFILE_COLUMN_FAMILY, pathToRemove.meta());
}
update(credentials, zooLock, m);
}
- public static void finishSplit(KeyExtent extent, Map<String,DataFileValue> datafileSizes, List<String> highDatafilesToRemove, TCredentials credentials,
+ public static void finishSplit(KeyExtent extent, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, TCredentials credentials,
ZooLock zooLock) {
finishSplit(extent.getMetadataEntry(), datafileSizes, highDatafilesToRemove, credentials, zooLock);
}
- public static void replaceDatafiles(KeyExtent extent, Set<String> datafilesToDelete, Set<String> scanFiles, String path, Long compactionId,
- DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) {
+ public static void replaceDatafiles(KeyExtent extent, Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
+ DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) throws IOException {
replaceDatafiles(extent, datafilesToDelete, scanFiles, path, compactionId, size, credentials, address, lastLocation, zooLock, true);
}
- public static void replaceDatafiles(KeyExtent extent, Set<String> datafilesToDelete, Set<String> scanFiles, String path, Long compactionId,
- DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) {
+ public static void replaceDatafiles(KeyExtent extent, Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
+ DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) throws IOException {
if (insertDeleteFlags) {
// add delete flags for those paths before the data file reference is removed
@@ -443,14 +443,14 @@ public class MetadataTable extends org.a
// replace data file references to old mapfiles with the new mapfiles
Mutation m = new Mutation(extent.getMetadataEntry());
- for (String pathToRemove : datafilesToDelete)
- m.putDelete(DATAFILE_COLUMN_FAMILY, new Text(pathToRemove));
+ for (FileRef pathToRemove : datafilesToDelete)
+ m.putDelete(DATAFILE_COLUMN_FAMILY, pathToRemove.meta());
- for (String scanFile : scanFiles)
- m.put(SCANFILE_COLUMN_FAMILY, new Text(scanFile), new Value("".getBytes()));
+ for (FileRef scanFile : scanFiles)
+ m.put(SCANFILE_COLUMN_FAMILY, scanFile.meta(), new Value("".getBytes()));
if (size.getNumEntries() > 0)
- m.put(DATAFILE_COLUMN_FAMILY, new Text(path), new Value(size.encode()));
+ m.put(DATAFILE_COLUMN_FAMILY, path.meta(), new Value(size.encode()));
if (compactionId != null)
COMPACT_COLUMN.put(m, new Value(("" + compactionId).getBytes()));
@@ -465,45 +465,49 @@ public class MetadataTable extends org.a
update(credentials, zooLock, m);
}
- public static void addDeleteEntries(KeyExtent extent, Set<String> datafilesToDelete, TCredentials credentials) {
+ public static void addDeleteEntries(KeyExtent extent, Set<FileRef> datafilesToDelete, TCredentials credentials) throws IOException {
String tableId = extent.getTableId().toString();
// TODO could use batch writer,would need to handle failure and retry like update does - ACCUMULO-1294
- for (String pathToRemove : datafilesToDelete)
- update(credentials, createDeleteMutation(tableId, pathToRemove));
+ for (FileRef pathToRemove : datafilesToDelete) {
+ update(credentials, createDeleteMutation(tableId, pathToRemove.path().toString()));
+ }
}
- public static void addDeleteEntry(String tableId, String path) {
+ public static void addDeleteEntry(String tableId, String path) throws IOException {
update(SecurityConstants.getSystemCredentials(), createDeleteMutation(tableId, path));
}
- public static Mutation createDeleteMutation(String tableId, String pathToRemove) {
- Mutation delFlag;
+ public static Mutation createDeleteMutation(String tableId, String pathToRemove) throws IOException {
String prefix = DELETE_FLAG_PREFIX;
if (tableId.equals(ID))
prefix = RootTable.DELETE_FLAG_PREFIX;
- if (pathToRemove.startsWith("../"))
- delFlag = new Mutation(new Text(prefix + pathToRemove.substring(2)));
- else
- delFlag = new Mutation(new Text(prefix + "/" + tableId + pathToRemove));
+ if (!pathToRemove.contains(":")) {
+ if (pathToRemove.startsWith("../"))
+ pathToRemove = pathToRemove.substring(2);
+ else
+ pathToRemove = "/" + tableId + "/" + pathToRemove;
+ }
+ Path path = VolumeManagerImpl.get().getFullPath(ServerConstants.getTablesDirs(), pathToRemove);
+ Mutation delFlag = new Mutation(new Text(prefix + path.toString()));
delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
return delFlag;
}
- public static void removeScanFiles(KeyExtent extent, Set<String> scanFiles, TCredentials credentials, ZooLock zooLock) {
+ public static void removeScanFiles(KeyExtent extent, Set<FileRef> scanFiles, TCredentials credentials, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
- for (String pathToRemove : scanFiles)
- m.putDelete(SCANFILE_COLUMN_FAMILY, new Text(pathToRemove));
+ for (FileRef pathToRemove : scanFiles)
+ m.putDelete(SCANFILE_COLUMN_FAMILY, pathToRemove.meta());
update(credentials, zooLock, m);
}
private static KeyExtent fixSplit(Text table, Text metadataEntry, Text metadataPrevEndRow, Value oper, double splitRatio, TServerInstance tserver,
- TCredentials credentials, String time, long initFlushID, long initCompactID, ZooLock lock) throws AccumuloException {
+ TCredentials credentials, String time, long initFlushID, long initCompactID, ZooLock lock) throws AccumuloException, IOException {
if (metadataPrevEndRow == null)
// something is wrong, this should not happen... if a tablet is split, it will always have a
// prev end row....
@@ -515,6 +519,7 @@ public class MetadataTable extends org.a
ScannerImpl scanner2 = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, ID, Authorizations.EMPTY);
scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
+ VolumeManager fs = VolumeManagerImpl.get();
if (!scanner2.iterator().hasNext()) {
log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), credentials, lock);
@@ -522,24 +527,24 @@ public class MetadataTable extends org.a
} else {
log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
- List<String> highDatafilesToRemove = new ArrayList<String>();
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
Scanner scanner3 = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, ID, Authorizations.EMPTY);
Key rowKey = new Key(metadataEntry);
- SortedMap<String,DataFileValue> origDatafileSizes = new TreeMap<String,DataFileValue>();
- SortedMap<String,DataFileValue> highDatafileSizes = new TreeMap<String,DataFileValue>();
- SortedMap<String,DataFileValue> lowDatafileSizes = new TreeMap<String,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
scanner3.fetchColumnFamily(DATAFILE_COLUMN_FAMILY);
scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
for (Entry<Key,Value> entry : scanner3) {
if (entry.getKey().compareColumnFamily(DATAFILE_COLUMN_FAMILY) == 0) {
- origDatafileSizes.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get()));
+ origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get()));
}
}
- splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<String,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes, highDatafileSizes,
+ splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes, highDatafileSizes,
highDatafilesToRemove);
MetadataTable.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, credentials, lock);
@@ -549,11 +554,11 @@ public class MetadataTable extends org.a
}
- public static void splitDatafiles(Text table, Text midRow, double splitRatio, Map<String,FileUtil.FileInfo> firstAndLastRows,
- SortedMap<String,DataFileValue> datafiles, SortedMap<String,DataFileValue> lowDatafileSizes, SortedMap<String,DataFileValue> highDatafileSizes,
- List<String> highDatafilesToRemove) {
+ public static void splitDatafiles(Text table, Text midRow, double splitRatio, Map<FileRef,FileUtil.FileInfo> firstAndLastRows,
+ SortedMap<FileRef,DataFileValue> datafiles, SortedMap<FileRef,DataFileValue> lowDatafileSizes, SortedMap<FileRef,DataFileValue> highDatafileSizes,
+ List<FileRef> highDatafilesToRemove) {
- for (Entry<String,DataFileValue> entry : datafiles.entrySet()) {
+ for (Entry<FileRef,DataFileValue> entry : datafiles.entrySet()) {
Text firstRow = null;
Text lastRow = null;
@@ -593,7 +598,7 @@ public class MetadataTable extends org.a
}
public static KeyExtent fixSplit(Text metadataEntry, SortedMap<ColumnFQ,Value> columns, TServerInstance tserver, TCredentials credentials, ZooLock lock)
- throws AccumuloException {
+ throws AccumuloException, IOException {
log.info("Incomplete split " + metadataEntry + " attempting to fix");
Value oper = columns.get(OLD_PREV_ROW_COLUMN);
@@ -633,7 +638,7 @@ public class MetadataTable extends org.a
return fixSplit(table, metadataEntry, metadataPrevEndRow, oper, splitRatio, tserver, credentials, time.toString(), initFlushID, initCompactID, lock);
}
- public static void deleteTable(String tableId, boolean insertDeletes, TCredentials credentials, ZooLock lock) throws AccumuloException {
+ public static void deleteTable(String tableId, boolean insertDeletes, TCredentials credentials, ZooLock lock) throws AccumuloException, IOException {
Scanner ms = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, ID, Authorizations.EMPTY);
Text tableIdText = new Text(tableId);
BatchWriter bw = new BatchWriterImpl(HdfsZooInstance.getInstance(), credentials, ID, new BatchWriterConfig().setMaxMemory(1000000)
@@ -653,10 +658,8 @@ public class MetadataTable extends org.a
Key key = cell.getKey();
if (key.getColumnFamily().equals(DATAFILE_COLUMN_FAMILY)) {
- String relPath = key.getColumnQualifier().toString();
- // only insert deletes for files owned by this table
- if (!relPath.startsWith("../"))
- bw.addMutation(createDeleteMutation(tableId, relPath));
+ FileRef ref = new FileRef(VolumeManagerImpl.get(), key);
+ bw.addMutation(createDeleteMutation(tableId, ref.meta().toString()));
}
if (DIRECTORY_COLUMN.hasColumns(key)) {
@@ -715,7 +718,7 @@ public class MetadataTable extends org.a
extent.write(out);
out.writeLong(timestamp);
out.writeUTF(server);
- out.writeUTF(filename);
+ out.writeUTF(filename.toString());
out.write(tabletId);
out.write(logSet.size());
for (String s : logSet) {
@@ -752,8 +755,11 @@ public class MetadataTable extends org.a
while (true) {
try {
IZooReaderWriter zoo = ZooReaderWriter.getInstance();
- if (zoo.isLockHeld(zooLock.getLockID()))
- zoo.putPersistentData(root + "/" + entry.filename, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+ if (zoo.isLockHeld(zooLock.getLockID())) {
+ String[] parts = entry.filename.split("/");
+ String uniqueId = parts[parts.length - 1];
+ zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE);
+ }
break;
} catch (KeeperException e) {
log.error(e, e);
@@ -775,7 +781,7 @@ public class MetadataTable extends org.a
public static LogEntry entryFromKeyValue(Key key, Value value) {
MetadataTable.LogEntry e = new MetadataTable.LogEntry();
e.extent = new KeyExtent(key.getRow(), EMPTY_TEXT);
- String[] parts = key.getColumnQualifier().toString().split("/");
+ String[] parts = key.getColumnQualifier().toString().split("/", 2);
e.server = parts[0];
e.filename = parts[1];
parts = value.toString().split("\\|");
@@ -785,22 +791,23 @@ public class MetadataTable extends org.a
return e;
}
- public static Pair<List<LogEntry>,SortedMap<String,DataFileValue>> getFileAndLogEntries(TCredentials credentials, KeyExtent extent) throws KeeperException,
+ public static Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>> getFileAndLogEntries(TCredentials credentials, KeyExtent extent) throws KeeperException,
InterruptedException, IOException {
ArrayList<LogEntry> result = new ArrayList<LogEntry>();
- TreeMap<String,DataFileValue> sizes = new TreeMap<String,DataFileValue>();
+ TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
+ VolumeManager fs = VolumeManagerImpl.get();
if (extent.isRootTablet()) {
getRootLogEntries(result);
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
- FileStatus[] files = fs.listStatus(new Path(ServerConstants.getRootTabletDir()));
-
+ Path rootDir = new Path(ServerConstants.getRootTabletDir());
+ rootDir = rootDir.makeQualified(fs.getDefaultVolume());
+ FileStatus[] files = fs.listStatus(rootDir);
for (FileStatus fileStatus : files) {
if (fileStatus.getPath().toString().endsWith("_tmp")) {
continue;
}
DataFileValue dfv = new DataFileValue(0, 0);
- sizes.put(RootTable.ZROOT_TABLET + "/" + fileStatus.getPath().getName(), dfv);
+ sizes.put(new FileRef(fileStatus.getPath().toString(), fileStatus.getPath()), dfv);
}
} else {
@@ -818,14 +825,14 @@ public class MetadataTable extends org.a
result.add(entryFromKeyValue(entry.getKey(), entry.getValue()));
} else if (entry.getKey().getColumnFamily().equals(DATAFILE_COLUMN_FAMILY)) {
DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(entry.getKey().getColumnQualifier().toString(), dfv);
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
} else {
throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
}
}
}
- return new Pair<List<LogEntry>,SortedMap<String,DataFileValue>>(result, sizes);
+ return new Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>>(result, sizes);
}
public static List<LogEntry> getLogEntries(TCredentials credentials, KeyExtent extent) throws IOException, KeeperException, InterruptedException {
@@ -903,6 +910,8 @@ public class MetadataTable extends org.a
try {
Scanner scanner = HdfsZooInstance.getInstance().getConnector(creds.getPrincipal(), CredentialHelper.extractToken(creds))
.createScanner(NAME, Authorizations.EMPTY);
+ log.info("Setting range to " + KEYSPACE);
+ scanner.setRange(KEYSPACE);
scanner.fetchColumnFamily(LOG_COLUMN_FAMILY);
metadataEntries = scanner.iterator();
} catch (Exception ex) {
@@ -961,8 +970,9 @@ public class MetadataTable extends org.a
for (Entry<Key,Value> entry : tablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(DATAFILE_COLUMN_FAMILY)) {
String cf = entry.getKey().getColumnQualifier().toString();
- if (srcTableId != null && !cf.startsWith("../"))
+ if (srcTableId != null && !cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
+ }
files.add(cf);
}
}
@@ -976,7 +986,7 @@ public class MetadataTable extends org.a
for (Entry<Key,Value> entry : tablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(DATAFILE_COLUMN_FAMILY)) {
String cf = entry.getKey().getColumnQualifier().toString();
- if (!cf.startsWith("../"))
+ if (!cf.startsWith("../") && !cf.contains(":"))
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(CURRENT_LOCATION_COLUMN_FAMILY)) {
@@ -1177,15 +1187,16 @@ public class MetadataTable extends org.a
bw.close();
}
- public static List<String> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) {
- List<String> result = new ArrayList<String>();
+ public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException {
+ List<FileRef> result = new ArrayList<FileRef>();
try {
+ VolumeManager fs = VolumeManagerImpl.get();
Scanner mscanner = new IsolatedScanner(conn.createScanner(NAME, Authorizations.EMPTY));
mscanner.setRange(extent.toMetadataRange());
mscanner.fetchColumnFamily(BULKFILE_COLUMN_FAMILY);
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
- result.add(entry.getKey().getColumnQualifier().toString());
+ result.add(new FileRef(fs, entry.getKey()));
}
}
return result;
@@ -1195,22 +1206,21 @@ public class MetadataTable extends org.a
}
}
- public static Map<String,Long> getBulkFilesLoaded(TCredentials credentials, KeyExtent extent) {
+ public static Map<FileRef,Long> getBulkFilesLoaded(TCredentials credentials, KeyExtent extent) throws IOException {
return getBulkFilesLoaded(credentials, extent.getMetadataEntry());
}
- public static Map<String,Long> getBulkFilesLoaded(TCredentials credentials, Text metadataRow) {
+ public static Map<FileRef,Long> getBulkFilesLoaded(TCredentials credentials, Text metadataRow) throws IOException {
- Map<String,Long> ret = new HashMap<String,Long>();
+ Map<FileRef,Long> ret = new HashMap<FileRef,Long>();
+ VolumeManager fs = VolumeManagerImpl.get();
Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, ID, Authorizations.EMPTY);
scanner.setRange(new Range(metadataRow));
scanner.fetchColumnFamily(BULKFILE_COLUMN_FAMILY);
for (Entry<Key,Value> entry : scanner) {
- String file = entry.getKey().getColumnQualifier().toString();
Long tid = Long.parseLong(entry.getValue().toString());
-
- ret.put(file, tid);
+ ret.put(new FileRef(fs, entry.getKey()), tid);
}
return ret;
}
@@ -1237,7 +1247,7 @@ public class MetadataTable extends org.a
scanner.setRange(new Range(DELETES_KEYSPACE));
for (Entry<Key,Value> entry : scanner) {
String row = entry.getKey().getRow().toString();
- if (row.startsWith(DELETE_FLAG_PREFIX + "/" + ID)) {
+ if (row.startsWith(DELETE_FLAG_PREFIX)) {
String filename = row.substring(DELETE_FLAG_PREFIX.length());
// add the new entry first
log.info("Moving " + filename + " marker to the root tablet");
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java Wed Jun 19 20:18:30 2013
@@ -45,13 +45,14 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.system.VisibilityFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.RootTable;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.util.MetadataTable.LogEntry;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -61,13 +62,14 @@ public class OfflineMetadataScanner exte
private Set<String> allFiles = new HashSet<String>();
private Range range = new Range();
- private final FileSystem fs;
+ private final VolumeManager fs;
private final AccumuloConfiguration conf;
- private List<SortedKeyValueIterator<Key,Value>> openMapFiles(Collection<String> files, FileSystem fs, AccumuloConfiguration conf) throws IOException {
+ private List<SortedKeyValueIterator<Key,Value>> openMapFiles(Collection<String> files, VolumeManager fs, AccumuloConfiguration conf) throws IOException {
List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
for (String file : files) {
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, true, fs, fs.getConf(), conf);
+ FileSystem ns = fs.getFileSystemByPath(new Path(file));
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, true, ns, ns.getConf(), conf);
readers.add(reader);
}
return readers;
@@ -118,7 +120,7 @@ public class OfflineMetadataScanner exte
}
- public OfflineMetadataScanner(AccumuloConfiguration conf, FileSystem fs) throws IOException {
+ public OfflineMetadataScanner(AccumuloConfiguration conf, VolumeManager fs) throws IOException {
super();
this.fs = fs;
this.conf = conf;
@@ -151,7 +153,7 @@ public class OfflineMetadataScanner exte
while (ssi.hasTop()) {
if (ssi.getTopKey().compareColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY) == 0) {
- allFiles.add(ServerConstants.getMetadataTableDir() + "/" + ssi.getTopKey().getColumnQualifier().toString());
+ allFiles.add(fs.getFullPath(ssi.getTopKey()).toString());
} else {
walogs++;
}
@@ -256,8 +258,8 @@ public class OfflineMetadataScanner exte
}
public static void main(String[] args) throws IOException {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
ServerConfiguration conf = new ServerConfiguration(HdfsZooInstance.getInstance());
+ VolumeManager fs = VolumeManagerImpl.get();
OfflineMetadataScanner scanner = new OfflineMetadataScanner(conf.getConfiguration(), fs);
scanner.setRange(MetadataTable.KEYSPACE);
for (Entry<Key,Value> entry : scanner)
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java Wed Jun 19 20:18:30 2013
@@ -18,20 +18,18 @@ package org.apache.accumulo.server.util;
import java.util.Map.Entry;
-import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.MetadataTable;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -54,7 +52,7 @@ public class RemoveEntriesForMissingFile
ScannerOpts scanOpts = new ScannerOpts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(RemoveEntriesForMissingFiles.class.getName(), args, scanOpts, bwOpts);
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ VolumeManager fs = VolumeManagerImpl.get();
Connector connector = opts.getConnector();
Scanner metadata = connector.createScanner(MetadataTable.NAME, opts.auths);
metadata.setBatchSize(scanOpts.scanBatchSize);
@@ -68,11 +66,7 @@ public class RemoveEntriesForMissingFile
for (Entry<Key,Value> entry : metadata) {
count++;
Key key = entry.getKey();
- String table = new String(KeyExtent.tableOfMetadataRow(entry.getKey().getRow()));
- String file = key.getColumnQualifier().toString();
- if (!file.startsWith("/"))
- file = "/" + file;
- Path map = new Path(ServerConstants.getTablesDir() + "/" + table + file);
+ Path map = fs.getFullPath(key);
if (!fs.exists(map)) {
missing++;
log.info("File " + map + " is missing");
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TableDiskUsage.java Wed Jun 19 20:18:30 2013
@@ -16,24 +16,261 @@
*/
package org.apache.accumulo.server.util;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
-import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.NumUtil;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
import com.beust.jcommander.Parameter;
public class TableDiskUsage {
+
+ private static final Logger log = Logger.getLogger(Logger.class);
+ private int nextInternalId = 0;
+ private Map<String,Integer> internalIds = new HashMap<String,Integer>();
+ private Map<Integer,String> externalIds = new HashMap<Integer,String>();
+ private Map<String,Integer[]> tableFiles = new HashMap<String,Integer[]>();
+ private Map<String,Long> fileSizes = new HashMap<String,Long>();
+
+ void addTable(String tableId) {
+ if (internalIds.containsKey(tableId))
+ throw new IllegalArgumentException("Already added table " + tableId);
+
+ int iid = nextInternalId++;
+
+ internalIds.put(tableId, iid);
+ externalIds.put(iid, tableId);
+ }
+
+ void linkFileAndTable(String tableId, String file) {
+ int internalId = internalIds.get(tableId);
+
+ Integer[] tables = tableFiles.get(file);
+ if (tables == null) {
+ tables = new Integer[internalIds.size()];
+ for (int i = 0; i < tables.length; i++)
+ tables[i] = 0;
+ tableFiles.put(file, tables);
+ }
+
+ tables[internalId] = 1;
+ }
+
+ void addFileSize(String file, long size) {
+ fileSizes.put(file, size);
+ }
+
+ Map<List<String>,Long> calculateUsage() {
+
+ Map<List<Integer>,Long> usage = new HashMap<List<Integer>,Long>();
+
+ for (Entry<String,Integer[]> entry : tableFiles.entrySet()) {
+ log.info("fileSizes " + fileSizes + " key " + Arrays.asList(entry.getKey()));
+ List<Integer> key = Arrays.asList(entry.getValue());
+ Long size = fileSizes.get(entry.getKey());
+
+ Long tablesUsage = usage.get(key);
+ if (tablesUsage == null)
+ tablesUsage = 0l;
+
+ tablesUsage += size;
+
+ usage.put(key, tablesUsage);
+
+ }
+
+ Map<List<String>,Long> externalUsage = new HashMap<List<String>,Long>();
+
+ for (Entry<List<Integer>,Long> entry : usage.entrySet()) {
+ List<String> externalKey = new ArrayList<String>();
+ List<Integer> key = entry.getKey();
+ for (int i = 0; i < key.size(); i++)
+ if (key.get(i) != 0)
+ externalKey.add(externalIds.get(i));
+
+ externalUsage.put(externalKey, entry.getValue());
+ }
+
+ return externalUsage;
+ }
+
+ public interface Printer {
+ void print(String line);
+ }
+
+ public static void printDiskUsage(AccumuloConfiguration acuConf, Collection<String> tables, FileSystem fs, Connector conn, boolean humanReadable)
+ throws TableNotFoundException, IOException {
+ printDiskUsage(acuConf, tables, fs, conn, new Printer() {
+ @Override
+ public void print(String line) {
+ System.out.println(line);
+ }
+ }, humanReadable);
+ }
+
+ public static Map<TreeSet<String>,Long> getDiskUsage(AccumuloConfiguration acuConf, Set<String> tableIds, FileSystem fs, Connector conn)
+ throws IOException {
+ TableDiskUsage tdu = new TableDiskUsage();
+
+ for (String tableId : tableIds)
+ tdu.addTable(tableId);
+
+ HashSet<String> tablesReferenced = new HashSet<String>(tableIds);
+ HashSet<String> emptyTableIds = new HashSet<String>();
+
+ for (String tableId : tableIds) {
+ Scanner mdScanner = null;
+ try {
+ mdScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ mdScanner.fetchColumnFamily(MetadataTable.DATAFILE_COLUMN_FAMILY);
+ mdScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ if (!mdScanner.iterator().hasNext()) {
+ emptyTableIds.add(tableId);
+ }
+
+ for (Entry<Key,Value> entry : mdScanner) {
+ String file = entry.getKey().getColumnQualifier().toString();
+ String parts[] = file.split("/");
+ String uniqueName = parts[parts.length - 1];
+ if (file.contains(":") || file.startsWith("../")) {
+ String ref = parts[parts.length - 3];
+ if (!ref.equals(tableId)) {
+ tablesReferenced.add(ref);
+ }
+ }
+
+ tdu.linkFileAndTable(tableId, uniqueName);
+ }
+ }
+
+ for (String tableId : tablesReferenced) {
+ for (String tableDir : ServerConstants.getTablesDirs()) {
+ FileStatus[] files = fs.globStatus(new Path(tableDir + "/" + tableId + "/*/*"));
+ if (files != null) {
+ for (FileStatus fileStatus : files) {
+ // Assumes that all filenames are unique
+ String name = fileStatus.getPath().getName();
+ tdu.addFileSize(name, fileStatus.getLen());
+ }
+ }
+ }
+ }
+
+ HashMap<String,String> reverseTableIdMap = new HashMap<String,String>();
+ for (Entry<String,String> entry : conn.tableOperations().tableIdMap().entrySet())
+ reverseTableIdMap.put(entry.getValue(), entry.getKey());
+
+ TreeMap<TreeSet<String>,Long> usage = new TreeMap<TreeSet<String>,Long>(new Comparator<TreeSet<String>>() {
+
+ @Override
+ public int compare(TreeSet<String> o1, TreeSet<String> o2) {
+ int len1 = o1.size();
+ int len2 = o2.size();
+
+ int min = Math.min(len1, len2);
+
+ Iterator<String> iter1 = o1.iterator();
+ Iterator<String> iter2 = o2.iterator();
+
+ int count = 0;
+
+ while (count < min) {
+ String s1 = iter1.next();
+ String s2 = iter2.next();
+
+ int cmp = s1.compareTo(s2);
+
+ if (cmp != 0)
+ return cmp;
+
+ count++;
+ }
+
+ return len1 - len2;
+ }
+ });
+
+ for (Entry<List<String>,Long> entry : tdu.calculateUsage().entrySet()) {
+ TreeSet<String> tableNames = new TreeSet<String>();
+ for (String tableId : entry.getKey())
+ tableNames.add(reverseTableIdMap.get(tableId));
+
+ usage.put(tableNames, entry.getValue());
+ }
+
+ if (!emptyTableIds.isEmpty()) {
+ TreeSet<String> emptyTables = new TreeSet<String>();
+ for (String tableId : emptyTableIds) {
+ emptyTables.add(reverseTableIdMap.get(tableId));
+ }
+ usage.put(emptyTables, 0L);
+ }
+
+ return usage;
+ }
+
+ public static void printDiskUsage(AccumuloConfiguration acuConf, Collection<String> tables, FileSystem fs, Connector conn, Printer printer,
+ boolean humanReadable) throws TableNotFoundException, IOException {
+
+ HashSet<String> tableIds = new HashSet<String>();
+
+ for (String tableName : tables) {
+ String tableId = conn.tableOperations().tableIdMap().get(tableName);
+ if (tableId == null)
+ throw new TableNotFoundException(null, tableName, "Table " + tableName + " not found");
+
+ tableIds.add(tableId);
+ }
+
+ Map<TreeSet<String>,Long> usage = getDiskUsage(acuConf, tableIds, fs, conn);
+
+ String valueFormat = humanReadable ? "%9s" : "%,24d";
+ for (Entry<TreeSet<String>,Long> entry : usage.entrySet()) {
+ Object value = humanReadable ? NumUtil.bigNumberForSize(entry.getValue()) : entry.getValue();
+ printer.print(String.format(valueFormat + " %s", value, entry.getKey()));
+ }
+ }
+
+
static class Opts extends ClientOpts {
@Parameter(description=" <table> { <table> ... } ")
List<String> tables = new ArrayList<String>();
}
-
+
/**
* @param args
*/
@@ -42,7 +279,7 @@ public class TableDiskUsage {
Opts opts = new Opts();
opts.parseArgs(TableDiskUsage.class.getName(), args);
Connector conn = opts.getConnector();
- org.apache.accumulo.core.util.TableDiskUsage.printDiskUsage(DefaultConfiguration.getInstance(), opts.tables, fs, conn, false);
+ org.apache.accumulo.server.util.TableDiskUsage.printDiskUsage(DefaultConfiguration.getInstance(), opts.tables, fs, conn, false);
}
}
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java Wed Jun 19 20:18:30 2013
@@ -19,10 +19,11 @@ package org.apache.accumulo.server.util;
import java.io.IOException;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -31,32 +32,33 @@ public class TabletOperations {
private static final Logger log = Logger.getLogger(TabletOperations.class);
- public static String createTabletDirectory(FileSystem fs, String tableDir, Text endRow) {
+ public static String createTabletDirectory(VolumeManager fs, String tableId, Text endRow) {
String lowDirectory;
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+ String volume = fs.choose(ServerConstants.getTablesDirs());
while (true) {
try {
if (endRow == null) {
lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
- Path lowDirectoryPath = new Path(tableDir + lowDirectory);
+ Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath))
- return lowDirectory;
+ return lowDirectoryPath.makeQualified(fs.getFileSystemByPath(lowDirectoryPath)).toString();
log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
} else {
lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
- Path lowDirectoryPath = new Path(tableDir + lowDirectory);
+ Path lowDirectoryPath = new Path(volume + "/" + tableId + "/" + lowDirectory);
if (fs.exists(lowDirectoryPath))
throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
if (fs.mkdirs(lowDirectoryPath))
- return lowDirectory;
+ return lowDirectoryPath.makeQualified(fs.getFileSystemByPath(lowDirectoryPath)).toString();
}
} catch (IOException e) {
log.warn(e);
}
- log.warn("Failed to create dir for tablet in table " + tableDir + " will retry ...");
+ log.warn("Failed to create dir for tablet in table " + tableId + " in volume " + volume + " + will retry ...");
UtilWaitThread.sleep(3000);
}
@@ -65,7 +67,7 @@ public class TabletOperations {
public static String createTabletDirectory(String tableDir, Text endRow) {
while (true) {
try {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ VolumeManager fs = VolumeManagerImpl.get();
return createTabletDirectory(fs, tableDir, endRow);
} catch (IOException e) {
log.warn(e);
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java Wed Jun 19 20:18:30 2013
@@ -41,7 +41,7 @@ public class ZooKeeperMain {
Opts opts = new Opts();
opts.parseArgs(ZooKeeperMain.class.getName(), args);
FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- String baseDir = ServerConstants.getBaseDir();
+ String baseDir = ServerConstants.getBaseDirs()[0];
System.out.println("Using " + fs.makeQualified(new Path(baseDir + "/instance_id")) + " to lookup accumulo instance");
Instance instance = HdfsZooInstance.getInstance();
if (opts.servers == null) {
Modified: accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java (original)
+++ accumulo/trunk/server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java Wed Jun 19 20:18:30 2013
@@ -34,9 +34,9 @@ import org.apache.accumulo.core.data.Val
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.MetadataTable;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
@@ -94,7 +94,7 @@ public class TestConfirmDeletes {
TCredentials auth = CredentialHelper.create("root", new PasswordToken(new byte[0]), "instance");
Instance instance = new MockInstance();
- FileSystem fs = FileSystem.getLocal(CachedConfiguration.getInstance());
+ VolumeManager fs = VolumeManagerImpl.getLocal();
load(instance, metadata, deletes);
Modified: accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java (original)
+++ accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/MultiReaderTest.java Wed Jun 19 20:18:30 2013
@@ -16,13 +16,14 @@
*/
package org.apache.accumulo.server.tabletserver.log;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.tabletserver.log.MultiReader;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -33,27 +34,31 @@ import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class MultiReaderTest {
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem fs;
+ VolumeManager fs;
+ TemporaryFolder root = new TemporaryFolder();
@Before
public void setUp() throws Exception {
// quiet log messages about compress.CodecPool
Logger.getRootLogger().setLevel(Level.ERROR);
- fs = FileSystem.getLocal(conf);
- Path root = new Path("manyMaps");
+ fs = VolumeManagerImpl.getLocal();
+ root.create();
+ String path = root.getRoot().getAbsolutePath();
+ Path root = new Path("file://" + path + "/manyMaps");
fs.mkdirs(root);
fs.create(new Path(root, "finished")).close();
- Writer writer = new Writer(conf, fs, "manyMaps/odd", IntWritable.class, BytesWritable.class);
+ FileSystem ns = fs.getDefaultVolume();
+ Writer writer = new Writer(ns.getConf(), ns, new Path(root, "odd").toString(), IntWritable.class, BytesWritable.class);
BytesWritable value = new BytesWritable("someValue".getBytes());
for (int i = 1; i < 1000; i += 2) {
writer.append(new IntWritable(i), value);
}
writer.close();
- writer = new Writer(conf, fs, "manyMaps/even", IntWritable.class, BytesWritable.class);
+ writer = new Writer(ns.getConf(), ns, new Path(root, "even").toString(), IntWritable.class, BytesWritable.class);
for (int i = 0; i < 1000; i += 2) {
if (i == 10)
continue;
@@ -64,8 +69,7 @@ public class MultiReaderTest {
@After
public void tearDown() throws Exception {
- if (fs != null)
- fs.delete(new Path("manyMaps"), true);
+ root.create();
}
private void scan(MultiReader reader, int start) throws IOException {
@@ -92,7 +96,8 @@ public class MultiReaderTest {
@Test
public void testMultiReader() throws IOException {
- MultiReader reader = new MultiReader(fs, conf, "manyMaps");
+ Path manyMaps = new Path("file://" + root.getRoot().getAbsolutePath() + "/manyMaps");
+ MultiReader reader = new MultiReader(fs, manyMaps);
IntWritable key = new IntWritable();
BytesWritable value = new BytesWritable();
@@ -121,8 +126,8 @@ public class MultiReaderTest {
assertEquals(0, key.get());
reader.close();
- fs.delete(new Path("manyMaps/even"), true);
- reader = new MultiReader(fs, conf, "manyMaps");
+ fs.deleteRecursively(new Path(manyMaps, "even"));
+ reader = new MultiReader(fs, manyMaps);
key.set(501);
assertTrue(reader.seek(key));
scanOdd(reader, 501);
Modified: accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java (original)
+++ accumulo/trunk/server/src/test/java/org/apache/accumulo/server/tabletserver/log/SortedLogRecoveryTest.java Wed Jun 19 20:18:30 2013
@@ -29,28 +29,27 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
-import java.util.Map.Entry;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.logger.LogEvents;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
-import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
-import org.apache.accumulo.server.tabletserver.log.SortedLogRecovery;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapFile.Writer;
+import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class SortedLogRecoveryTest {
@@ -115,29 +114,31 @@ public class SortedLogRecoveryTest {
}
private static List<Mutation> recover(Map<String,KeyValue[]> logs, Set<String> files, KeyExtent extent) throws IOException {
- final String workdir = "workdir";
- Configuration conf = CachedConfiguration.getInstance();
- FileSystem local = FileSystem.getLocal(conf).getRaw();
- local.delete(new Path(workdir), true);
- ArrayList<String> dirs = new ArrayList<String>();
+ TemporaryFolder root = new TemporaryFolder();
+ root.create();
+ final String workdir = "file://" + root.getRoot().getAbsolutePath() + "/workdir";
+ VolumeManager fs = VolumeManagerImpl.getLocal();
+ fs.deleteRecursively(new Path(workdir));
+ ArrayList<Path> dirs = new ArrayList<Path>();
try {
for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
String path = workdir + "/" + entry.getKey();
- Writer map = new MapFile.Writer(conf, local, path + "/log1", LogFileKey.class, LogFileValue.class);
+ FileSystem ns = fs.getFileSystemByPath(new Path(path));
+ Writer map = new MapFile.Writer(ns.getConf(), ns, path + "/log1", LogFileKey.class, LogFileValue.class);
for (KeyValue lfe : entry.getValue()) {
map.append(lfe.key, lfe.value);
}
map.close();
- local.create(new Path(path, "finished")).close();
- dirs.add(path);
+ ns.create(new Path(path, "finished")).close();
+ dirs.add(new Path(path));
}
// Recover
- SortedLogRecovery recovery = new SortedLogRecovery();
+ SortedLogRecovery recovery = new SortedLogRecovery(fs);
CaptureMutations capture = new CaptureMutations();
recovery.recover(extent, dirs, files, capture);
return capture.result;
} finally {
- local.delete(new Path(workdir), true);
+ root.delete();
}
}
Modified: accumulo/trunk/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java (original)
+++ accumulo/trunk/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java Wed Jun 19 20:18:30 2013
@@ -22,8 +22,6 @@ import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
-import org.apache.accumulo.trace.instrument.Tracer;
-import org.apache.accumulo.server.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -42,11 +40,14 @@ import org.apache.accumulo.core.util.Cac
import org.apache.accumulo.core.util.MetadataTable;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOnRequiredTable;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.monitor.Monitor;
import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus;
@@ -110,15 +111,21 @@ public class ContinuousStatsCollector {
}
private String getFSStats() throws Exception {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- Path acudir = new Path(ServerConstants.getTablesDir());
- ContentSummary contentSummary = fs.getContentSummary(acudir);
-
- Path tableDir = new Path(ServerConstants.getTablesDir() + "/" + tableId);
- ContentSummary contentSummary2 = fs.getContentSummary(tableDir);
+ VolumeManager fs = VolumeManagerImpl.get();
+ long length1 = 0, dcount1 = 0, fcount1 = 0;
+ long length2 = 0, dcount2 = 0, fcount2 = 0;
+ for (String dir : ServerConstants.getTablesDirs()) {
+ ContentSummary contentSummary = fs.getContentSummary(new Path(dir));
+ length1 += contentSummary.getLength();
+ dcount1 += contentSummary.getDirectoryCount();
+ fcount1 += contentSummary.getFileCount();
+ contentSummary = fs.getContentSummary(new Path(dir, tableId));
+ length2 += contentSummary.getLength();
+ dcount2 += contentSummary.getDirectoryCount();
+ fcount2 += contentSummary.getFileCount();
+ }
- return "" + contentSummary.getLength() + " " + contentSummary.getDirectoryCount() + " " + contentSummary.getFileCount() + " "
- + contentSummary2.getLength() + " " + contentSummary2.getDirectoryCount() + " " + contentSummary2.getFileCount();
+ return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2;
}
private String getACUStats() throws Exception {
Modified: accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java (original)
+++ accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java Wed Jun 19 20:18:30 2013
@@ -35,7 +35,6 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
@@ -45,12 +44,15 @@ import org.apache.accumulo.fate.zookeepe
import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.tabletserver.TabletServer;
import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.util.FileUtil;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -129,15 +131,15 @@ public class SplitRecoveryTest extends F
Text midRow = new Text(mr);
- SortedMap<String,DataFileValue> splitMapFiles = null;
+ SortedMap<FileRef,DataFileValue> splitMapFiles = null;
for (int i = 0; i < extents.length; i++) {
KeyExtent extent = extents[i];
- String tdir = "/dir_" + i;
+ String tdir = ServerConstants.getTablesDirs()[0] + "/" + extent.getTableId().toString() + "/dir_" + i;
MetadataTable.addTablet(extent, tdir, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID, zl);
- SortedMap<String,DataFileValue> mapFiles = new TreeMap<String,DataFileValue>();
- mapFiles.put(tdir + "/" + RFile.EXTENSION + "_000_000", new DataFileValue(1000017 + i, 10000 + i));
+ SortedMap<FileRef,DataFileValue> mapFiles = new TreeMap<FileRef,DataFileValue>();
+ mapFiles.put(new FileRef(tdir + "/" + RFile.EXTENSION + "_000_000"), new DataFileValue(1000017 + i, 10000 + i));
if (i == extentToSplit) {
splitMapFiles = mapFiles;
@@ -155,14 +157,14 @@ public class SplitRecoveryTest extends F
splitPartiallyAndRecover(extent, high, low, .4, splitMapFiles, midRow, "localhost:1234", failPoint, zl);
}
- private void splitPartiallyAndRecover(KeyExtent extent, KeyExtent high, KeyExtent low, double splitRatio, SortedMap<String,DataFileValue> mapFiles,
+ private void splitPartiallyAndRecover(KeyExtent extent, KeyExtent high, KeyExtent low, double splitRatio, SortedMap<FileRef,DataFileValue> mapFiles,
Text midRow, String location, int steps, ZooLock zl) throws Exception {
- SortedMap<String,DataFileValue> lowDatafileSizes = new TreeMap<String,DataFileValue>();
- SortedMap<String,DataFileValue> highDatafileSizes = new TreeMap<String,DataFileValue>();
- List<String> highDatafilesToRemove = new ArrayList<String>();
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
- MetadataTable.splitDatafiles(extent.getTableId(), midRow, splitRatio, new HashMap<String,FileUtil.FileInfo>(), mapFiles, lowDatafileSizes,
+ MetadataTable.splitDatafiles(extent.getTableId(), midRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), mapFiles, lowDatafileSizes,
highDatafileSizes, highDatafilesToRemove);
MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(), zl);
@@ -174,7 +176,7 @@ public class SplitRecoveryTest extends F
writer.update(m);
if (steps >= 1) {
- Map<String,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
+ Map<FileRef,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, bulkFiles, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID
+ "0", -1l, -1l, zl);
}
@@ -187,8 +189,8 @@ public class SplitRecoveryTest extends F
ensureTabletHasNoUnexpectedMetadataEntries(low, lowDatafileSizes);
ensureTabletHasNoUnexpectedMetadataEntries(high, highDatafileSizes);
- Map<String,Long> lowBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), low);
- Map<String,Long> highBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), high);
+ Map<FileRef,Long> lowBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), low);
+ Map<FileRef,Long> highBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), high);
if (!lowBulkFiles.equals(highBulkFiles)) {
throw new Exception(" " + lowBulkFiles + " != " + highBulkFiles + " " + low + " " + high);
@@ -202,7 +204,7 @@ public class SplitRecoveryTest extends F
}
}
- private void ensureTabletHasNoUnexpectedMetadataEntries(KeyExtent extent, SortedMap<String,DataFileValue> expectedMapFiles) throws Exception {
+ private void ensureTabletHasNoUnexpectedMetadataEntries(KeyExtent extent, SortedMap<FileRef,DataFileValue> expectedMapFiles) throws Exception {
Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), MetadataTable.ID, Authorizations.EMPTY);
scanner.setRange(extent.toMetadataRange());
@@ -242,17 +244,17 @@ public class SplitRecoveryTest extends F
throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns);
}
- SortedMap<String,DataFileValue> fixedMapFiles = MetadataTable.getDataFileSizes(extent, SecurityConstants.getSystemCredentials());
+ SortedMap<FileRef,DataFileValue> fixedMapFiles = MetadataTable.getDataFileSizes(extent, SecurityConstants.getSystemCredentials());
verifySame(expectedMapFiles, fixedMapFiles);
}
- private void verifySame(SortedMap<String,DataFileValue> datafileSizes, SortedMap<String,DataFileValue> fixedDatafileSizes) throws Exception {
+ private void verifySame(SortedMap<FileRef,DataFileValue> datafileSizes, SortedMap<FileRef,DataFileValue> fixedDatafileSizes) throws Exception {
if (!datafileSizes.keySet().containsAll(fixedDatafileSizes.keySet()) || !fixedDatafileSizes.keySet().containsAll(datafileSizes.keySet())) {
throw new Exception("Key sets not the same " + datafileSizes.keySet() + " != " + fixedDatafileSizes.keySet());
}
- for (Entry<String,DataFileValue> entry : datafileSizes.entrySet()) {
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
DataFileValue dfv = entry.getValue();
DataFileValue otherDfv = fixedDatafileSizes.get(entry.getKey());
Modified: accumulo/trunk/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java (original)
+++ accumulo/trunk/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java Wed Jun 19 20:18:30 2013
@@ -63,14 +63,14 @@ import org.apache.accumulo.core.security
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Stat;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.cli.ClientOnRequiredTable;
import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.util.MetadataTable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -105,7 +105,7 @@ public class CollectTabletStats {
columnsTmp = opts.columns.split(",");
final String columns[] = columnsTmp;
- final FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ final VolumeManager fs = VolumeManagerImpl.get();
Instance instance = opts.getInstance();
final ServerConfiguration sconf = new ServerConfiguration(instance);
@@ -126,10 +126,10 @@ public class CollectTabletStats {
List<KeyExtent> tabletsToTest = selectRandomTablets(opts.numThreads, candidates);
- Map<KeyExtent,List<String>> tabletFiles = new HashMap<KeyExtent,List<String>>();
+ Map<KeyExtent,List<FileRef>> tabletFiles = new HashMap<KeyExtent,List<FileRef>>();
for (KeyExtent ke : tabletsToTest) {
- List<String> files = getTabletFiles(CredentialHelper.create(opts.principal, opts.getToken(), opts.instance), opts.getInstance(), tableId, ke);
+ List<FileRef> files = getTabletFiles(CredentialHelper.create(opts.principal, opts.getToken(), opts.instance), opts.getInstance(), tableId, ke);
tabletFiles.put(ke, files);
}
@@ -155,7 +155,7 @@ public class CollectTabletStats {
ArrayList<Test> tests = new ArrayList<Test>();
for (final KeyExtent ke : tabletsToTest) {
- final List<String> files = tabletFiles.get(ke);
+ final List<FileRef> files = tabletFiles.get(ke);
Test test = new Test(ke) {
public int runTest() throws Exception {
return readFiles(fs, sconf.getConfiguration(), files, ke, columns);
@@ -174,7 +174,7 @@ public class CollectTabletStats {
ArrayList<Test> tests = new ArrayList<Test>();
for (final KeyExtent ke : tabletsToTest) {
- final List<String> files = tabletFiles.get(ke);
+ final List<FileRef> files = tabletFiles.get(ke);
Test test = new Test(ke) {
public int runTest() throws Exception {
return readFilesUsingIterStack(fs, sconf, files, opts.auths, ke, columns, false);
@@ -191,7 +191,7 @@ public class CollectTabletStats {
ArrayList<Test> tests = new ArrayList<Test>();
for (final KeyExtent ke : tabletsToTest) {
- final List<String> files = tabletFiles.get(ke);
+ final List<FileRef> files = tabletFiles.get(ke);
Test test = new Test(ke) {
public int runTest() throws Exception {
return readFilesUsingIterStack(fs, sconf, files, opts.auths, ke, columns, true);
@@ -372,29 +372,23 @@ public class CollectTabletStats {
return tabletsToTest;
}
- private static List<String> getTabletFiles(TCredentials token, Instance zki, String tableId, KeyExtent ke) {
- List<String> files = new ArrayList<String>();
-
- for (String cq : MetadataTable.getDataFileSizes(ke, token).keySet()) {
- files.add(ServerConstants.getTablesDir() + "/" + tableId + cq);
- }
- return files;
+ private static List<FileRef> getTabletFiles(TCredentials token, Instance zki, String tableId, KeyExtent ke) throws IOException {
+ return new ArrayList<FileRef>(MetadataTable.getDataFileSizes(ke, token).keySet());
}
- private static void reportHdfsBlockLocations(List<String> files) throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
+ private static void reportHdfsBlockLocations(List<FileRef> files) throws Exception {
+ VolumeManager fs = VolumeManagerImpl.get();
System.out.println("\t\tFile block report : ");
- for (String file : files) {
- FileStatus status = fs.getFileStatus(new Path(file));
+ for (FileRef file : files) {
+ FileStatus status = fs.getFileStatus(file.path());
if (status.isDir()) {
// assume it is a map file
status = fs.getFileStatus(new Path(file + "/data"));
}
-
- BlockLocation[] locs = fs.getFileBlockLocations(status, 0, status.getLen());
+ FileSystem ns = fs.getFileSystemByPath(file.path());
+ BlockLocation[] locs = ns.getFileBlockLocations(status, 0, status.getLen());
System.out.println("\t\t\tBlocks for : " + file);
@@ -433,14 +427,15 @@ public class CollectTabletStats {
return visFilter;
}
- private static int readFiles(FileSystem fs, AccumuloConfiguration aconf, List<String> files, KeyExtent ke, String[] columns) throws Exception {
+ private static int readFiles(VolumeManager fs, AccumuloConfiguration aconf, List<FileRef> files, KeyExtent ke, String[] columns) throws Exception {
int count = 0;
HashSet<ByteSequence> columnSet = createColumnBSS(columns);
- for (String file : files) {
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, fs.getConf(), aconf);
+ for (FileRef file : files) {
+ FileSystem ns = fs.getFileSystemByPath(file.path());
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf);
Range range = new Range(ke.getPrevEndRow(), false, ke.getEndRow(), true);
reader.seek(range, columnSet, columnSet.size() == 0 ? false : true);
while (reader.hasTop() && !range.afterEndKey(reader.getTopKey())) {
@@ -461,7 +456,7 @@ public class CollectTabletStats {
return columnSet;
}
- private static int readFilesUsingIterStack(FileSystem fs, ServerConfiguration aconf, List<String> files, Authorizations auths, KeyExtent ke, String[] columns,
+ private static int readFilesUsingIterStack(VolumeManager fs, ServerConfiguration aconf, List<FileRef> files, Authorizations auths, KeyExtent ke, String[] columns,
boolean useTableIterators)
throws Exception {
@@ -469,8 +464,9 @@ public class CollectTabletStats {
List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<SortedKeyValueIterator<Key,Value>>(files.size());
- for (String file : files) {
- readers.add(FileOperations.getInstance().openReader(file, false, fs, fs.getConf(), aconf.getConfiguration()));
+ for (FileRef file : files) {
+ FileSystem ns = fs.getFileSystemByPath(file.path());
+ readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf.getConfiguration()));
}
List<IterInfo> emptyIterinfo = Collections.emptyList();
Modified: accumulo/trunk/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java?rev=1494759&r1=1494758&r2=1494759&view=diff
==============================================================================
--- accumulo/trunk/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java (original)
+++ accumulo/trunk/test/src/test/java/org/apache/accumulo/test/ShellServerTest.java Wed Jun 19 20:18:30 2013
@@ -190,10 +190,10 @@ public class ShellServerTest {
exec("addsplits row5", true);
exec("config -t t -s table.split.threshold=345M", true);
exec("offline t", true);
- String export = folder.newFolder().toString();
+ String export = "file://" + folder.newFolder().toString();
exec("exporttable -t t " + export, true);
DistCp cp = newDistCp();
- String import_ = folder.newFolder().toString();
+ String import_ = "file://" +folder.newFolder().toString();
cp.run(new String[] {"-f", export + "/distcp.txt", import_});
exec("importtable t2 " + import_, true);
exec("config -t t2 -np", true, "345M", true);