You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/06/05 19:19:26 UTC
svn commit: r1489969 [3/4] - in /accumulo/branches/ACCUMULO-118:
core/src/main/java/org/apache/accumulo/core/util/
proxy/src/test/java/org/apache/accumulo/proxy/
server/src/main/java/org/apache/accumulo/server/
server/src/main/java/org/apache/accumulo/...
Added: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java?rev=1489969&view=auto
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java (added)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java Wed Jun 5 17:19:25 2013
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.system.MultiIterator;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.LocalityGroupUtil;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+public class FileUtil {
+
+ public static class FileInfo {
+ Key firstKey = new Key();
+ Key lastKey = new Key();
+
+ public FileInfo(Key firstKey, Key lastKey) {
+ this.firstKey = firstKey;
+ this.lastKey = lastKey;
+ }
+
+ public Text getFirstRow() {
+ return firstKey.getRow();
+ }
+
+ public Text getLastRow() {
+ return lastKey.getRow();
+ }
+ }
+
+ private static final Logger log = Logger.getLogger(FileUtil.class);
+
+ private static String createTmpDir(AccumuloConfiguration acuConf, FileSystem fs) throws IOException {
+ String accumuloDir = acuConf.get(Property.INSTANCE_DFS_DIR);
+
+ String tmpDir = null;
+ while (tmpDir == null) {
+ tmpDir = accumuloDir + "/tmp/idxReduce_" + String.format("%09d", (int) (Math.random() * Integer.MAX_VALUE));
+
+ try {
+ fs.getFileStatus(new Path(tmpDir));
+ tmpDir = null;
+ continue;
+ } catch (FileNotFoundException fne) {
+ // found an unused temp directory
+ }
+
+ fs.mkdirs(new Path(tmpDir));
+
+ // try to reserve the tmp dir
+ if (!fs.createNewFile(new Path(tmpDir + "/__reserve")))
+ tmpDir = null;
+ }
+
+ return tmpDir;
+ }
+
+ public static Collection<FileRef> reduceFiles(AccumuloConfiguration acuConf, Configuration conf, FileSystem fs, Text prevEndRow, Text endRow,
+ Collection<FileRef> mapFiles, int maxFiles, String tmpDir, int pass) throws IOException {
+ ArrayList<FileRef> paths = new ArrayList<FileRef>(mapFiles);
+
+ if (paths.size() <= maxFiles)
+ return paths;
+
+ String newDir = String.format("%s/pass_%04d", tmpDir, pass);
+
+ int start = 0;
+
+ ArrayList<FileRef> outFiles = new ArrayList<FileRef>();
+
+ int count = 0;
+
+ while (start < paths.size()) {
+ int end = Math.min(maxFiles + start, paths.size());
+ List<FileRef> inFiles = paths.subList(start, end);
+
+ start = end;
+
+ String newMapFile = String.format("%s/%04d." + RFile.EXTENSION, newDir, count++);
+
+ outFiles.add(new FileRef(newMapFile));
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(newMapFile);
+ FileSKVWriter writer = new RFileOperations().openWriter(newMapFile, ns, ns.getConf(), acuConf);
+ writer.startDefaultLocalityGroup();
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
+
+ FileSKVIterator reader = null;
+ try {
+ for (FileRef s : inFiles) {
+ reader = FileOperations.getInstance().openIndex(s.path().toString(), ns, ns.getConf(), acuConf);
+ iters.add(reader);
+ }
+
+ MultiIterator mmfi = new MultiIterator(iters, true);
+
+ while (mmfi.hasTop()) {
+ Key key = mmfi.getTopKey();
+
+ boolean gtPrevEndRow = prevEndRow == null || key.compareRow(prevEndRow) > 0;
+ boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0;
+
+ if (gtPrevEndRow && lteEndRow)
+ writer.append(key, new Value(new byte[0]));
+
+ if (!lteEndRow)
+ break;
+
+ mmfi.next();
+ }
+ } finally {
+ try {
+ if (reader != null)
+ reader.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+
+ for (SortedKeyValueIterator<Key,Value> r : iters)
+ try {
+ if (r != null)
+ ((FileSKVIterator) r).close();
+ } catch (IOException e) {
+ // continue closing
+ log.error(e, e);
+ }
+
+ try {
+ if (writer != null)
+ writer.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ throw e;
+ }
+ }
+ }
+
+ return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1);
+ }
+
+ public static SortedMap<Double,Key> findMidPoint(FileSystem fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+ double minSplit) throws IOException {
+ return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);
+ }
+
+ public static double estimatePercentageLTE(FileSystem fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+ Text splitRow) throws IOException {
+
+ Configuration conf = CachedConfiguration.getInstance();
+
+ String tmpDir = null;
+
+ int maxToOpen = acuconf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
+ ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(mapFiles.size());
+
+ try {
+ if (mapFiles.size() > maxToOpen) {
+ tmpDir = createTmpDir(acuconf, fs);
+
+ log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
+
+ long t1 = System.currentTimeMillis();
+ mapFiles = reduceFiles(acuconf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
+ long t2 = System.currentTimeMillis();
+
+ log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
+ }
+
+ if (prevEndRow == null)
+ prevEndRow = new Text();
+
+ long numKeys = 0;
+
+ numKeys = countIndexEntries(acuconf, prevEndRow, endRow, mapFiles, true, conf, fs, readers);
+
+ if (numKeys == 0) {
+ // not enough info in the index to answer the question, so instead of going to
+ // the data just punt and return .5
+ return .5;
+ }
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
+ MultiIterator mmfi = new MultiIterator(iters, true);
+
+ // skip the prevendrow
+ while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0) {
+ mmfi.next();
+ }
+
+ int numLte = 0;
+
+ while (mmfi.hasTop() && mmfi.getTopKey().compareRow(splitRow) <= 0) {
+ numLte++;
+ mmfi.next();
+ }
+
+ if (numLte > numKeys) {
+ // something went wrong
+ throw new RuntimeException("numLte > numKeys " + numLte + " " + numKeys + " " + prevEndRow + " " + endRow + " " + splitRow + " " + mapFiles);
+ }
+
+ // do not want to return 0% or 100%, so add 1 and 2 below
+ return (numLte + 1) / (double) (numKeys + 2);
+
+ } finally {
+ cleanupIndexOp(acuconf, tmpDir, fs, readers);
+ }
+ }
+
+ /**
+ *
+ * @param mapFiles
+ * - list MapFiles to find the mid point key
+ *
+ * ISSUES : This method used the index files to find the mid point. If the map files have different index intervals this method will not return an
+ * accurate mid point. Also, it would be tricky to use this method in conjunction with an in memory map because the indexing interval is unknown.
+ */
+ public static SortedMap<Double,Key> findMidPoint(FileSystem fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+ double minSplit, boolean useIndex) throws IOException {
+ Configuration conf = CachedConfiguration.getInstance();
+
+ Collection<FileRef> origMapFiles = mapFiles;
+
+ String tmpDir = null;
+
+ int maxToOpen = acuConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
+ ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(mapFiles.size());
+
+ try {
+ if (mapFiles.size() > maxToOpen) {
+ if (!useIndex)
+ throw new IOException("Cannot find mid point using data files, too many " + mapFiles.size());
+ tmpDir = createTmpDir(acuConf, fs);
+
+ log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
+
+ long t1 = System.currentTimeMillis();
+ mapFiles = reduceFiles(acuConf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
+ long t2 = System.currentTimeMillis();
+
+ log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
+ }
+
+ if (prevEndRow == null)
+ prevEndRow = new Text();
+
+ long t1 = System.currentTimeMillis();
+
+ long numKeys = 0;
+
+ numKeys = countIndexEntries(acuConf, prevEndRow, endRow, mapFiles, tmpDir == null ? useIndex : false, conf, fs, readers);
+
+ if (numKeys == 0) {
+ if (useIndex) {
+ log.warn("Failed to find mid point using indexes, falling back to data files which is slower. No entries between " + prevEndRow + " and " + endRow
+ + " for " + mapFiles);
+ // need to pass original map files, not possibly reduced indexes
+ return findMidPoint(fs, acuConf, prevEndRow, endRow, origMapFiles, minSplit, false);
+ }
+ throw new IOException("Failed to find mid point, no entries between " + prevEndRow + " and " + endRow + " for " + mapFiles);
+ }
+
+ List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
+ MultiIterator mmfi = new MultiIterator(iters, true);
+
+ // skip the prevendrow
+ while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0)
+ mmfi.next();
+
+ // read half of the keys in the index
+ TreeMap<Double,Key> ret = new TreeMap<Double,Key>();
+ Key lastKey = null;
+ long keysRead = 0;
+
+ Key keyBeforeMidPoint = null;
+ long keyBeforeMidPointPosition = 0;
+
+ while (keysRead < numKeys / 2) {
+ if (lastKey != null && !lastKey.equals(mmfi.getTopKey(), PartialKey.ROW) && (keysRead - 1) / (double) numKeys >= minSplit) {
+ keyBeforeMidPoint = new Key(lastKey);
+ keyBeforeMidPointPosition = keysRead - 1;
+ }
+
+ if (lastKey == null)
+ lastKey = new Key();
+
+ lastKey.set(mmfi.getTopKey());
+
+ keysRead++;
+
+ // consume minimum
+ mmfi.next();
+ }
+
+ if (keyBeforeMidPoint != null)
+ ret.put(keyBeforeMidPointPosition / (double) numKeys, keyBeforeMidPoint);
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("Found midPoint from indexes in %6.2f secs.%n", ((t2 - t1) / 1000.0)));
+
+ ret.put(.5, mmfi.getTopKey());
+
+ // sanity check
+ for (Key key : ret.values()) {
+ boolean inRange = (key.compareRow(prevEndRow) > 0 && (endRow == null || key.compareRow(endRow) < 1));
+ if (!inRange) {
+ throw new IOException("Found mid point is not in range " + key + " " + prevEndRow + " " + endRow + " " + mapFiles);
+ }
+ }
+
+ return ret;
+ } finally {
+ cleanupIndexOp(acuConf, tmpDir, fs, readers);
+ }
+ }
+
+ private static void cleanupIndexOp(AccumuloConfiguration acuConf, String tmpDir, FileSystem fs, ArrayList<FileSKVIterator> readers) throws IOException {
+ // close all of the index sequence files
+ for (FileSKVIterator r : readers) {
+ try {
+ if (r != null)
+ r.close();
+ } catch (IOException e) {
+ // okay, try to close the rest anyway
+ log.error(e, e);
+ }
+ }
+
+ if (tmpDir != null) {
+ String tmpPrefix = acuConf.get(Property.INSTANCE_DFS_DIR) + "/tmp";
+ if (tmpDir.startsWith(tmpPrefix))
+ fs.deleteRecursively(new Path(tmpDir));
+ else
+ log.error("Did not delete tmp dir because it wasn't a tmp dir " + tmpDir);
+ }
+ }
+
+ private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles, boolean useIndex,
+ Configuration conf, FileSystem fs, ArrayList<FileSKVIterator> readers) throws IOException {
+
+ long numKeys = 0;
+
+ // count the total number of index entries
+ for (FileRef ref : mapFiles) {
+ FileSKVIterator reader = null;
+ Path path = ref.path();
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+ try {
+ if (useIndex)
+ reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf);
+ else
+ reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, ns, ns.getConf(),
+ acuConf);
+
+ while (reader.hasTop()) {
+ Key key = reader.getTopKey();
+ if (endRow != null && key.compareRow(endRow) > 0)
+ break;
+ else if (prevEndRow == null || key.compareRow(prevEndRow) > 0)
+ numKeys++;
+
+ reader.next();
+ }
+ } finally {
+ try {
+ if (reader != null)
+ reader.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ }
+
+ if (useIndex)
+ readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf));
+ else
+ readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, ns, ns.getConf(),
+ acuConf));
+
+ }
+ return numKeys;
+ }
+
+ public static Map<FileRef,FileInfo> tryToGetFirstAndLastRows(FileSystem fs, AccumuloConfiguration acuConf, Set<FileRef> mapfiles) {
+
+ HashMap<FileRef,FileInfo> mapFilesInfo = new HashMap<FileRef,FileInfo>();
+
+ long t1 = System.currentTimeMillis();
+
+ for (FileRef mapfile : mapfiles) {
+
+ FileSKVIterator reader = null;
+ String path = mapfile.path().toString();
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+ try {
+ reader = FileOperations.getInstance().openReader(path, false, ns, ns.getConf(), acuConf);
+
+ Key firstKey = reader.getFirstKey();
+ if (firstKey != null) {
+ mapFilesInfo.put(mapfile, new FileInfo(firstKey, reader.getLastKey()));
+ }
+
+ } catch (IOException ioe) {
+ log.warn("Failed to read map file to determine first and last key : " + mapfile, ioe);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException ioe) {
+ log.warn("failed to close " + mapfile, ioe);
+ }
+ }
+ }
+
+ }
+
+ long t2 = System.currentTimeMillis();
+
+ log.debug(String.format("Found first and last keys for %d map files in %6.2f secs", mapfiles.size(), (t2 - t1) / 1000.0));
+
+ return mapFilesInfo;
+ }
+
+ public static WritableComparable<Key> findLastKey(FileSystem fs, AccumuloConfiguration acuConf, Collection<FileRef> mapFiles) throws IOException {
+ Key lastKey = null;
+
+ for (FileRef ref : mapFiles) {
+ Path path = ref.path();
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(path);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), acuConf);
+
+ try {
+ if (!reader.hasTop())
+ // file is empty, so there is no last key
+ continue;
+
+ Key key = reader.getLastKey();
+
+ if (lastKey == null || key.compareTo(lastKey) > 0)
+ lastKey = key;
+ } finally {
+ try {
+ if (reader != null)
+ reader.close();
+ } catch (IOException e) {
+ log.error(e, e);
+ }
+ }
+ }
+
+ return lastKey;
+
+ }
+
+ private static class MLong {
+ public MLong(long i) {
+ l = i;
+ }
+
+ long l;
+ }
+
+ public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile, long fileSize, List<KeyExtent> extents, Configuration conf,
+ FileSystem fs) throws IOException {
+
+ long totalIndexEntries = 0;
+ Map<KeyExtent,MLong> counts = new TreeMap<KeyExtent,MLong>();
+ for (KeyExtent keyExtent : extents)
+ counts.put(keyExtent, new MLong(0));
+
+ Text row = new Text();
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(mapFile);
+ FileSKVIterator index = FileOperations.getInstance().openIndex(mapFile.toString(), ns, ns.getConf(), acuConf);
+
+ try {
+ while (index.hasTop()) {
+ Key key = index.getTopKey();
+ totalIndexEntries++;
+ key.getRow(row);
+
+ for (Entry<KeyExtent,MLong> entry : counts.entrySet())
+ if (entry.getKey().contains(row))
+ entry.getValue().l++;
+
+ index.next();
+ }
+ } finally {
+ try {
+ if (index != null)
+ index.close();
+ } catch (IOException e) {
+ // continue with next file
+ log.error(e, e);
+ }
+ }
+
+ Map<KeyExtent,Long> results = new TreeMap<KeyExtent,Long>();
+ for (KeyExtent keyExtent : extents) {
+ double numEntries = counts.get(keyExtent).l;
+ if (numEntries == 0)
+ numEntries = 1;
+ long estSize = (long) ((numEntries / totalIndexEntries) * fileSize);
+ results.put(keyExtent, estSize);
+ }
+ return results;
+ }
+
+}
Propchange: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/FileUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/Initialize.java Wed Jun 5 17:19:25 2013
@@ -18,7 +18,10 @@ package org.apache.accumulo.server.util;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Locale;
import java.util.Map.Entry;
import java.util.UUID;
@@ -116,7 +119,7 @@ public class Initialize {
else
log.info("Hadoop Filesystem is " + FileSystem.getDefaultUri(conf));
- log.info("Accumulo data dir is " + ServerConstants.getBaseDir());
+ log.info("Accumulo data dirs are " + Arrays.asList(ServerConstants.getBaseDirs()));
log.info("Zookeeper server is " + ServerConfiguration.getSiteConfiguration().get(Property.INSTANCE_ZK_HOST));
log.info("Checking if Zookeeper is available. If this hangs, then you need to make sure zookeeper is running");
if (!zookeeperAvailable()) {
@@ -201,6 +204,24 @@ public class Initialize {
return false;
}
}
+ private static Path[] paths(String[] paths) {
+ Path result[] = new Path[paths.length];
+ for (int i = 0; i < paths.length; i++) {
+ result[i] = new Path(paths[i]);
+ }
+ return result;
+ }
+
+ private static <T> T[] concat(T[] a, T[] b) {
+ List<T> result = new ArrayList<T>(a.length + b.length);
+ for (int i = 0; i < a.length; i++) {
+ result.add(a[i]);
+ }
+ for (int i = 0; i < b.length; i++) {
+ result.add(b[i]);
+ }
+ return result.toArray(a);
+ }
private static void initFileSystem(Opts opts, FileSystem fs, Configuration conf, UUID uuid) throws IOException {
FileStatus fstat;
@@ -208,11 +229,11 @@ public class Initialize {
// the actual disk location of the root tablet
final Path rootTablet = new Path(ServerConstants.getRootTabletDir());
- final Path tableMetadataTablet = new Path(ServerConstants.getMetadataTableDir() + Constants.TABLE_TABLET_LOCATION);
- final Path defaultMetadataTablet = new Path(ServerConstants.getMetadataTableDir() + Constants.DEFAULT_TABLET_LOCATION);
-
- final Path metadataTableDir = new Path(ServerConstants.getMetadataTableDir());
+ final Path tableMetadataTabletDirs[] = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.TABLE_TABLET_LOCATION));
+ final Path defaultMetadataTabletDirs[] = paths(ServerConstants.prefix(ServerConstants.getMetadataTableDirs(), Constants.DEFAULT_TABLET_LOCATION));
+ final Path metadataTableDirs[] = paths(ServerConstants.getMetadataTableDirs());
+
fs.mkdirs(new Path(ServerConstants.getDataVersionLocation(), "" + Constants.DATA_VERSION));
// create an instance id
@@ -223,17 +244,18 @@ public class Initialize {
initMetadataConfig();
// create metadata table
- try {
- fstat = fs.getFileStatus(metadataTableDir);
- if (!fstat.isDir()) {
- log.fatal("location " + metadataTableDir.toString() + " exists but is not a directory");
- return;
- }
- } catch (FileNotFoundException fnfe) {
- // create btl dir
- if (!fs.mkdirs(metadataTableDir)) {
- log.fatal("unable to create directory " + metadataTableDir.toString());
- return;
+ for (Path mtd : metadataTableDirs) {
+ try {
+ fstat = fs.getFileStatus(mtd);
+ if (!fstat.isDir()) {
+ log.fatal("location " + mtd.toString() + " exists but is not a directory");
+ return;
+ }
+ } catch (FileNotFoundException fnfe) {
+ if (!fs.mkdirs(mtd)) {
+ log.fatal("unable to create directory " + mtd.toString());
+ return;
+ }
}
}
@@ -245,94 +267,95 @@ public class Initialize {
return;
}
} catch (FileNotFoundException fnfe) {
- // create btl dir
if (!fs.mkdirs(rootTablet)) {
log.fatal("unable to create directory " + rootTablet.toString());
return;
}
-
- // populate the root tablet with info about the default tablet
- // the root tablet contains the key extent and locations of all the
- // metadata tablets
- String initRootTabFile = ServerConstants.getMetadataTableDir() + "/root_tablet/00000_00000."
- + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
- FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
- mfw.startDefaultLocalityGroup();
-
- // -----------] root tablet info
- Text rootExtent = Constants.ROOT_TABLET_EXTENT.getMetadataEntry();
-
- // root's directory
- Key rootDirKey = new Key(rootExtent, Constants.METADATA_DIRECTORY_COLUMN.getColumnFamily(), Constants.METADATA_DIRECTORY_COLUMN.getColumnQualifier(), 0);
- mfw.append(rootDirKey, new Value("/root_tablet".getBytes()));
-
- // root's prev row
- Key rootPrevRowKey = new Key(rootExtent, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(), Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier(), 0);
- mfw.append(rootPrevRowKey, new Value(new byte[] {0}));
-
- // ----------] table tablet info
- Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), Constants.METADATA_RESERVED_KEYSPACE_START_KEY.getRow()));
-
- // table tablet's directory
- Key tableDirKey = new Key(tableExtent, Constants.METADATA_DIRECTORY_COLUMN.getColumnFamily(), Constants.METADATA_DIRECTORY_COLUMN.getColumnQualifier(), 0);
- mfw.append(tableDirKey, new Value(Constants.TABLE_TABLET_LOCATION.getBytes()));
-
- // table tablet time
- Key tableTimeKey = new Key(tableExtent, Constants.METADATA_TIME_COLUMN.getColumnFamily(), Constants.METADATA_TIME_COLUMN.getColumnQualifier(), 0);
- mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
-
- // table tablet's prevrow
- Key tablePrevRowKey = new Key(tableExtent, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(), Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier(),
- 0);
- mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(new Text(KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), null))));
-
- // ----------] default tablet info
- Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), null));
-
- // default's directory
- Key defaultDirKey = new Key(defaultExtent, Constants.METADATA_DIRECTORY_COLUMN.getColumnFamily(),
- Constants.METADATA_DIRECTORY_COLUMN.getColumnQualifier(), 0);
- mfw.append(defaultDirKey, new Value(Constants.DEFAULT_TABLET_LOCATION.getBytes()));
-
- // default's time
- Key defaultTimeKey = new Key(defaultExtent, Constants.METADATA_TIME_COLUMN.getColumnFamily(), Constants.METADATA_TIME_COLUMN.getColumnQualifier(), 0);
- mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
-
- // default's prevrow
- Key defaultPrevRowKey = new Key(defaultExtent, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(),
- Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier(), 0);
- mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(Constants.METADATA_RESERVED_KEYSPACE_START_KEY.getRow()));
-
- mfw.close();
}
+ // populate the root tablet with info about the default tablet
+ // the root tablet contains the key extent and locations of all the
+ // metadata tablets
+ String initRootTabFile = rootTablet + "/00000_00000."
+ + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration());
+ FileSKVWriter mfw = FileOperations.getInstance().openWriter(initRootTabFile, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+ mfw.startDefaultLocalityGroup();
+
+ // -----------] root tablet info
+ Text rootExtent = Constants.ROOT_TABLET_EXTENT.getMetadataEntry();
+
+ // root's directory
+ Key rootDirKey = new Key(rootExtent, Constants.METADATA_DIRECTORY_COLUMN.getColumnFamily(), Constants.METADATA_DIRECTORY_COLUMN.getColumnQualifier(), 0);
+ mfw.append(rootDirKey, new Value("/root_tablet".getBytes()));
+
+ // root's prev row
+ Key rootPrevRowKey = new Key(rootExtent, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(), Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier(), 0);
+ mfw.append(rootPrevRowKey, new Value(new byte[] {0}));
+
+ // ----------] table tablet info
+ Text tableExtent = new Text(KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), Constants.METADATA_RESERVED_KEYSPACE_START_KEY.getRow()));
+
+ // table tablet's directory
+ Key tableDirKey = new Key(tableExtent, Constants.METADATA_DIRECTORY_COLUMN.getColumnFamily(), Constants.METADATA_DIRECTORY_COLUMN.getColumnQualifier(), 0);
+ mfw.append(tableDirKey, new Value(Constants.TABLE_TABLET_LOCATION.getBytes()));
+
+ // table tablet time
+ Key tableTimeKey = new Key(tableExtent, Constants.METADATA_TIME_COLUMN.getColumnFamily(), Constants.METADATA_TIME_COLUMN.getColumnQualifier(), 0);
+ mfw.append(tableTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+
+ // table tablet's prevrow
+ Key tablePrevRowKey = new Key(tableExtent, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(), Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier(),
+ 0);
+ mfw.append(tablePrevRowKey, KeyExtent.encodePrevEndRow(new Text(KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), null))));
+
+ // ----------] default tablet info
+ Text defaultExtent = new Text(KeyExtent.getMetadataEntry(new Text(Constants.METADATA_TABLE_ID), null));
+
+ // default's directory
+ Key defaultDirKey = new Key(defaultExtent, Constants.METADATA_DIRECTORY_COLUMN.getColumnFamily(),
+ Constants.METADATA_DIRECTORY_COLUMN.getColumnQualifier(), 0);
+ mfw.append(defaultDirKey, new Value(Constants.DEFAULT_TABLET_LOCATION.getBytes()));
+
+ // default's time
+ Key defaultTimeKey = new Key(defaultExtent, Constants.METADATA_TIME_COLUMN.getColumnFamily(), Constants.METADATA_TIME_COLUMN.getColumnQualifier(), 0);
+ mfw.append(defaultTimeKey, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes()));
+
+ // default's prevrow
+ Key defaultPrevRowKey = new Key(defaultExtent, Constants.METADATA_PREV_ROW_COLUMN.getColumnFamily(),
+ Constants.METADATA_PREV_ROW_COLUMN.getColumnQualifier(), 0);
+ mfw.append(defaultPrevRowKey, KeyExtent.encodePrevEndRow(Constants.METADATA_RESERVED_KEYSPACE_START_KEY.getRow()));
+
+ mfw.close();
+
// create table and default tablets directories
- try {
- fstat = fs.getFileStatus(defaultMetadataTablet);
- if (!fstat.isDir()) {
- log.fatal("location " + defaultMetadataTablet.toString() + " exists but is not a directory");
- return;
- }
- } catch (FileNotFoundException fnfe) {
+ for (Path dir : concat(defaultMetadataTabletDirs, tableMetadataTabletDirs)) {
try {
- fstat = fs.getFileStatus(tableMetadataTablet);
+ fstat = fs.getFileStatus(dir);
if (!fstat.isDir()) {
- log.fatal("location " + tableMetadataTablet.toString() + " exists but is not a directory");
+ log.fatal("location " + dir.toString() + " exists but is not a directory");
return;
}
- } catch (FileNotFoundException fnfe2) {
- // create table info dir
- if (!fs.mkdirs(tableMetadataTablet)) {
- log.fatal("unable to create directory " + tableMetadataTablet.toString());
+ } catch (FileNotFoundException fnfe) {
+ try {
+ fstat = fs.getFileStatus(dir);
+ if (!fstat.isDir()) {
+ log.fatal("location " + dir.toString() + " exists but is not a directory");
+ return;
+ }
+ } catch (FileNotFoundException fnfe2) {
+ // create table info dir
+ if (!fs.mkdirs(dir)) {
+ log.fatal("unable to create directory " + dir.toString());
+ return;
+ }
+ }
+
+ // create default dir
+ if (!fs.mkdirs(dir)) {
+ log.fatal("unable to create directory " + dir.toString());
return;
}
}
-
- // create default dir
- if (!fs.mkdirs(defaultMetadataTablet)) {
- log.fatal("unable to create directory " + defaultMetadataTablet.toString());
- return;
- }
}
}
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/LocalityCheck.java Wed Jun 5 17:19:25 2013
@@ -23,17 +23,15 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class LocalityCheck {
@@ -42,7 +40,7 @@ public class LocalityCheck {
ClientOpts opts = new ClientOpts();
opts.parseArgs(LocalityCheck.class.getName(), args);
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ FileSystem fs = FileSystemImpl.get();
Connector connector = opts.getConnector();
Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
scanner.fetchColumnFamily(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY);
@@ -62,7 +60,8 @@ public class LocalityCheck {
addBlocks(fs, host, files, totalBlocks, localBlocks);
files.clear();
} else if (key.compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
- files.add(new String(KeyExtent.tableOfMetadataRow(key.getRow())) + slash(key.getColumnQualifier().toString()));
+
+ files.add(fs.getFullPath(key));
}
}
System.out.println(" Server %local total blocks");
@@ -72,12 +71,6 @@ public class LocalityCheck {
return 0;
}
- private static String slash(String path) {
- if (path.startsWith("/"))
- return path;
- return "/" + path;
- }
-
private void addBlocks(FileSystem fs, String host, ArrayList<String> files, Map<String,Long> totalBlocks, Map<String,Long> localBlocks) throws Exception {
long allBlocks = 0;
long matchingBlocks = 0;
@@ -86,9 +79,10 @@ public class LocalityCheck {
localBlocks.put(host, 0L);
}
for (String file : files) {
- Path filePath = new Path(ServerConstants.getTablesDir() + "/" + file);
- FileStatus fileStatus = fs.getFileStatus(filePath);
- BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ Path filePath = new Path(file);
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(filePath);
+ FileStatus fileStatus = ns.getFileStatus(filePath);
+ BlockLocation[] fileBlockLocations = ns.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation blockLocation : fileBlockLocations) {
allBlocks++;
for (String location : blockLocation.getHosts()) {
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java Wed Jun 5 17:19:25 2013
@@ -53,11 +53,10 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
+import org.apache.accumulo.server.util.FileUtil;
import org.apache.accumulo.core.security.CredentialHelper;
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.core.util.Pair;
@@ -69,14 +68,14 @@ import org.apache.accumulo.fate.zookeepe
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
-import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileRef;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
-import org.apache.accumulo.server.trace.TraceFileSystem;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -150,8 +149,8 @@ public class MetadataTable extends org.a
* @param flushId
*
*/
- public static void updateTabletDataFile(KeyExtent extent, String path, String mergeFile, DataFileValue dfv, String time, TCredentials credentials,
- Set<String> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
+ public static void updateTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, TCredentials credentials,
+ Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) {
if (extent.equals(Constants.ROOT_TABLET_EXTENT)) {
if (unusedWalLogs != null) {
IZooReaderWriter zk = ZooReaderWriter.getInstance();
@@ -187,7 +186,7 @@ public class MetadataTable extends org.a
Mutation m = new Mutation(extent.getMetadataEntry());
if (dfv.getNumEntries() > 0) {
- m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(path), new Value(dfv.encode()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, path.meta(), new Value(dfv.encode()));
Constants.METADATA_TIME_COLUMN.put(m, new Value(time.getBytes()));
// stuff in this location
TServerInstance self = getTServerInstance(address, zooLock);
@@ -202,11 +201,11 @@ public class MetadataTable extends org.a
}
}
- for (String scanFile : filesInUseByScans)
- m.put(Constants.METADATA_SCANFILE_COLUMN_FAMILY, new Text(scanFile), new Value("".getBytes()));
+ for (FileRef scanFile : filesInUseByScans)
+ m.put(Constants.METADATA_SCANFILE_COLUMN_FAMILY, scanFile.meta(), new Value("".getBytes()));
if (mergeFile != null)
- m.putDelete(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(mergeFile));
+ m.putDelete(Constants.METADATA_DATAFILE_COLUMN_FAMILY, mergeFile.meta());
Constants.METADATA_FLUSH_COLUMN.put(m, new Value((flushId + "").getBytes()));
@@ -243,12 +242,12 @@ public class MetadataTable extends org.a
}
}
- public static void updateTabletDataFile(long tid, KeyExtent extent, Map<String,DataFileValue> estSizes, String time, TCredentials credentials, ZooLock zooLock) {
+ public static void updateTabletDataFile(long tid, KeyExtent extent, Map<FileRef,DataFileValue> estSizes, String time, TCredentials credentials, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
byte[] tidBytes = Long.toString(tid).getBytes();
- for (Entry<String,DataFileValue> entry : estSizes.entrySet()) {
- Text file = new Text(entry.getKey());
+ for (Entry<FileRef,DataFileValue> entry : estSizes.entrySet()) {
+ Text file = entry.getKey().meta();
m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, file, new Value(entry.getValue().encode()));
m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, file, new Value(tidBytes));
}
@@ -335,12 +334,13 @@ public class MetadataTable extends org.a
return false;
}
- public static SortedMap<String,DataFileValue> getDataFileSizes(KeyExtent extent, TCredentials credentials) {
- TreeMap<String,DataFileValue> sizes = new TreeMap<String,DataFileValue>();
+ public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, TCredentials credentials) throws IOException {
+ TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
Scanner mdScanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
mdScanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
Text row = extent.getMetadataEntry();
+ FileSystem fs = FileSystemImpl.get();
Key endKey = new Key(row, Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(""));
endKey = endKey.followingKey(PartialKey.ROW_COLFAM);
@@ -351,14 +351,14 @@ public class MetadataTable extends org.a
if (!entry.getKey().getRow().equals(row))
break;
DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(entry.getKey().getColumnQualifier().toString(), dfv);
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
}
return sizes;
}
- public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<String,DataFileValue> datafileSizes,
- Map<String,Long> bulkLoadedFiles, TCredentials credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
+ public static void addNewTablet(KeyExtent extent, String path, TServerInstance location, Map<FileRef,DataFileValue> datafileSizes,
+ Map<FileRef,Long> bulkLoadedFiles, TCredentials credentials, String time, long lastFlushID, long lastCompactID, ZooLock zooLock) {
Mutation m = extent.getPrevRowUpdateMutation();
Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value(path.getBytes()));
@@ -373,13 +373,13 @@ public class MetadataTable extends org.a
m.putDelete(Constants.METADATA_FUTURE_LOCATION_COLUMN_FAMILY, location.asColumnQualifier());
}
- for (Entry<String,DataFileValue> entry : datafileSizes.entrySet()) {
- m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, entry.getKey().meta(), new Value(entry.getValue().encode()));
}
- for (Entry<String,Long> entry : bulkLoadedFiles.entrySet()) {
+ for (Entry<FileRef,Long> entry : bulkLoadedFiles.entrySet()) {
byte[] tidBytes = Long.toString(entry.getValue()).getBytes();
- m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(tidBytes));
+ m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, entry.getKey().meta(), new Value(tidBytes));
}
update(credentials, zooLock, m);
@@ -403,35 +403,35 @@ public class MetadataTable extends org.a
update(credentials, zooLock, m);
}
- public static void finishSplit(Text metadataEntry, Map<String,DataFileValue> datafileSizes, List<String> highDatafilesToRemove, TCredentials credentials,
+ public static void finishSplit(Text metadataEntry, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, TCredentials credentials,
ZooLock zooLock) {
Mutation m = new Mutation(metadataEntry);
Constants.METADATA_SPLIT_RATIO_COLUMN.putDelete(m);
Constants.METADATA_OLD_PREV_ROW_COLUMN.putDelete(m);
Constants.METADATA_CHOPPED_COLUMN.putDelete(m);
- for (Entry<String,DataFileValue> entry : datafileSizes.entrySet()) {
- m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(entry.getKey()), new Value(entry.getValue().encode()));
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, entry.getKey().meta(), new Value(entry.getValue().encode()));
}
- for (String pathToRemove : highDatafilesToRemove) {
- m.putDelete(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(pathToRemove));
+ for (FileRef pathToRemove : highDatafilesToRemove) {
+ m.putDelete(Constants.METADATA_DATAFILE_COLUMN_FAMILY, pathToRemove.meta());
}
update(credentials, zooLock, m);
}
- public static void finishSplit(KeyExtent extent, Map<String,DataFileValue> datafileSizes, List<String> highDatafilesToRemove, TCredentials credentials,
+ public static void finishSplit(KeyExtent extent, Map<FileRef,DataFileValue> datafileSizes, List<FileRef> highDatafilesToRemove, TCredentials credentials,
ZooLock zooLock) {
finishSplit(extent.getMetadataEntry(), datafileSizes, highDatafilesToRemove, credentials, zooLock);
}
- public static void replaceDatafiles(KeyExtent extent, Set<String> datafilesToDelete, Set<String> scanFiles, String path, Long compactionId,
+ public static void replaceDatafiles(KeyExtent extent, Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock) {
replaceDatafiles(extent, datafilesToDelete, scanFiles, path, compactionId, size, credentials, address, lastLocation, zooLock, true);
}
- public static void replaceDatafiles(KeyExtent extent, Set<String> datafilesToDelete, Set<String> scanFiles, String path, Long compactionId,
+ public static void replaceDatafiles(KeyExtent extent, Set<FileRef> datafilesToDelete, Set<FileRef> scanFiles, FileRef path, Long compactionId,
DataFileValue size, TCredentials credentials, String address, TServerInstance lastLocation, ZooLock zooLock, boolean insertDeleteFlags) {
if (insertDeleteFlags) {
@@ -442,14 +442,14 @@ public class MetadataTable extends org.a
// replace data file references to old mapfiles with the new mapfiles
Mutation m = new Mutation(extent.getMetadataEntry());
- for (String pathToRemove : datafilesToDelete)
- m.putDelete(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(pathToRemove));
+ for (FileRef pathToRemove : datafilesToDelete)
+ m.putDelete(Constants.METADATA_DATAFILE_COLUMN_FAMILY, pathToRemove.meta());
- for (String scanFile : scanFiles)
- m.put(Constants.METADATA_SCANFILE_COLUMN_FAMILY, new Text(scanFile), new Value("".getBytes()));
+ for (FileRef scanFile : scanFiles)
+ m.put(Constants.METADATA_SCANFILE_COLUMN_FAMILY, scanFile.meta(), new Value("".getBytes()));
if (size.getNumEntries() > 0)
- m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text(path), new Value(size.encode()));
+ m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, path.meta(), new Value(size.encode()));
if (compactionId != null)
Constants.METADATA_COMPACT_COLUMN.put(m, new Value(("" + compactionId).getBytes()));
@@ -464,13 +464,13 @@ public class MetadataTable extends org.a
update(credentials, zooLock, m);
}
- public static void addDeleteEntries(KeyExtent extent, Set<String> datafilesToDelete, TCredentials credentials) {
+ public static void addDeleteEntries(KeyExtent extent, Set<FileRef> datafilesToDelete, TCredentials credentials) {
String tableId = extent.getTableId().toString();
// TODO could use batch writer,would need to handle failure and retry like update does - ACCUMULO-1294
- for (String pathToRemove : datafilesToDelete)
- update(credentials, createDeleteMutation(tableId, pathToRemove));
+ for (FileRef pathToRemove : datafilesToDelete)
+ update(credentials, createDeleteMutation(tableId, pathToRemove.meta().toString()));
}
public static void addDeleteEntry(String tableId, String path) {
@@ -492,17 +492,17 @@ public class MetadataTable extends org.a
return delFlag;
}
- public static void removeScanFiles(KeyExtent extent, Set<String> scanFiles, TCredentials credentials, ZooLock zooLock) {
+ public static void removeScanFiles(KeyExtent extent, Set<FileRef> scanFiles, TCredentials credentials, ZooLock zooLock) {
Mutation m = new Mutation(extent.getMetadataEntry());
- for (String pathToRemove : scanFiles)
- m.putDelete(Constants.METADATA_SCANFILE_COLUMN_FAMILY, new Text(pathToRemove));
+ for (FileRef pathToRemove : scanFiles)
+ m.putDelete(Constants.METADATA_SCANFILE_COLUMN_FAMILY, pathToRemove.meta());
update(credentials, zooLock, m);
}
private static KeyExtent fixSplit(Text table, Text metadataEntry, Text metadataPrevEndRow, Value oper, double splitRatio, TServerInstance tserver,
- TCredentials credentials, String time, long initFlushID, long initCompactID, ZooLock lock) throws AccumuloException {
+ TCredentials credentials, String time, long initFlushID, long initCompactID, ZooLock lock) throws AccumuloException, IOException {
if (metadataPrevEndRow == null)
// something is wrong, this should not happen... if a tablet is split, it will always have a
// prev end row....
@@ -514,6 +514,7 @@ public class MetadataTable extends org.a
ScannerImpl scanner2 = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW)));
+ FileSystem fs = FileSystemImpl.get();
if (!scanner2.iterator().hasNext()) {
log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow);
rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), credentials, lock);
@@ -521,24 +522,24 @@ public class MetadataTable extends org.a
} else {
log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow);
- List<String> highDatafilesToRemove = new ArrayList<String>();
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
Scanner scanner3 = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
Key rowKey = new Key(metadataEntry);
- SortedMap<String,DataFileValue> origDatafileSizes = new TreeMap<String,DataFileValue>();
- SortedMap<String,DataFileValue> highDatafileSizes = new TreeMap<String,DataFileValue>();
- SortedMap<String,DataFileValue> lowDatafileSizes = new TreeMap<String,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> origDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
scanner3.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW)));
for (Entry<Key,Value> entry : scanner3) {
if (entry.getKey().compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
- origDatafileSizes.put(entry.getKey().getColumnQualifier().toString(), new DataFileValue(entry.getValue().get()));
+ origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get()));
}
}
- splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<String,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes, highDatafileSizes,
+ splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), origDatafileSizes, lowDatafileSizes, highDatafileSizes,
highDatafilesToRemove);
MetadataTable.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, credentials, lock);
@@ -549,11 +550,11 @@ public class MetadataTable extends org.a
}
- public static void splitDatafiles(Text table, Text midRow, double splitRatio, Map<String,FileUtil.FileInfo> firstAndLastRows,
- SortedMap<String,DataFileValue> datafiles, SortedMap<String,DataFileValue> lowDatafileSizes, SortedMap<String,DataFileValue> highDatafileSizes,
- List<String> highDatafilesToRemove) {
+ public static void splitDatafiles(Text table, Text midRow, double splitRatio, Map<FileRef,FileUtil.FileInfo> firstAndLastRows,
+ SortedMap<FileRef,DataFileValue> datafiles, SortedMap<FileRef,DataFileValue> lowDatafileSizes, SortedMap<FileRef,DataFileValue> highDatafileSizes,
+ List<FileRef> highDatafilesToRemove) {
- for (Entry<String,DataFileValue> entry : datafiles.entrySet()) {
+ for (Entry<FileRef,DataFileValue> entry : datafiles.entrySet()) {
Text firstRow = null;
Text lastRow = null;
@@ -593,7 +594,7 @@ public class MetadataTable extends org.a
}
public static KeyExtent fixSplit(Text metadataEntry, SortedMap<ColumnFQ,Value> columns, TServerInstance tserver, TCredentials credentials, ZooLock lock)
- throws AccumuloException {
+ throws AccumuloException, IOException {
log.info("Incomplete split " + metadataEntry + " attempting to fix");
Value oper = columns.get(Constants.METADATA_OLD_PREV_ROW_COLUMN);
@@ -785,22 +786,23 @@ public class MetadataTable extends org.a
return e;
}
- public static Pair<List<LogEntry>,SortedMap<String,DataFileValue>> getFileAndLogEntries(TCredentials credentials, KeyExtent extent) throws KeeperException,
+ public static Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>> getFileAndLogEntries(TCredentials credentials, KeyExtent extent) throws KeeperException,
InterruptedException, IOException {
ArrayList<LogEntry> result = new ArrayList<LogEntry>();
- TreeMap<String,DataFileValue> sizes = new TreeMap<String,DataFileValue>();
+ TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>();
+ FileSystem fs = FileSystemImpl.get();
if (extent.isRootTablet()) {
getRootLogEntries(result);
- FileSystem fs = TraceFileSystem.wrap(FileUtil.getFileSystem(CachedConfiguration.getInstance(), ServerConfiguration.getSiteConfiguration()));
- FileStatus[] files = fs.listStatus(new Path(ServerConstants.getRootTabletDir()));
-
+ Path rootDir = new Path(ServerConstants.getRootTabletDir());
+ rootDir = rootDir.makeQualified(fs.getDefaultNamespace());
+ FileStatus[] files = fs.listStatus(rootDir);
for (FileStatus fileStatus : files) {
if (fileStatus.getPath().toString().endsWith("_tmp")) {
continue;
}
DataFileValue dfv = new DataFileValue(0, 0);
- sizes.put(Constants.ZROOT_TABLET + "/" + fileStatus.getPath().getName(), dfv);
+ sizes.put(new FileRef(fileStatus.getPath().toString(), fileStatus.getPath()), dfv);
}
} else {
@@ -818,14 +820,14 @@ public class MetadataTable extends org.a
result.add(entryFromKeyValue(entry.getKey(), entry.getValue()));
} else if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
DataFileValue dfv = new DataFileValue(entry.getValue().get());
- sizes.put(entry.getKey().getColumnQualifier().toString(), dfv);
+ sizes.put(new FileRef(fs, entry.getKey()), dfv);
} else {
throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily());
}
}
}
- return new Pair<List<LogEntry>,SortedMap<String,DataFileValue>>(result, sizes);
+ return new Pair<List<LogEntry>,SortedMap<FileRef,DataFileValue>>(result, sizes);
}
public static List<LogEntry> getLogEntries(TCredentials credentials, KeyExtent extent) throws IOException, KeeperException, InterruptedException {
@@ -961,8 +963,9 @@ public class MetadataTable extends org.a
for (Entry<Key,Value> entry : tablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
String cf = entry.getKey().getColumnQualifier().toString();
- if (srcTableId != null && !cf.startsWith("../"))
+ if (srcTableId != null && !cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
+ }
files.add(cf);
}
}
@@ -976,7 +979,7 @@ public class MetadataTable extends org.a
for (Entry<Key,Value> entry : tablet.entrySet()) {
if (entry.getKey().getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
String cf = entry.getKey().getColumnQualifier().toString();
- if (!cf.startsWith("../"))
+ if (!cf.startsWith("../") && !cf.contains(":"))
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) {
@@ -1177,15 +1180,16 @@ public class MetadataTable extends org.a
bw.close();
}
- public static List<String> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) {
- List<String> result = new ArrayList<String>();
+ public static List<FileRef> getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException {
+ List<FileRef> result = new ArrayList<FileRef>();
try {
+ FileSystem fs = FileSystemImpl.get();
Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS));
mscanner.setRange(extent.toMetadataRange());
mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
for (Entry<Key,Value> entry : mscanner) {
if (Long.parseLong(entry.getValue().toString()) == tid) {
- result.add(entry.getKey().getColumnQualifier().toString());
+ result.add(new FileRef(fs, entry.getKey()));
}
}
return result;
@@ -1195,22 +1199,21 @@ public class MetadataTable extends org.a
}
}
- public static Map<String,Long> getBulkFilesLoaded(TCredentials credentials, KeyExtent extent) {
+ public static Map<FileRef,Long> getBulkFilesLoaded(TCredentials credentials, KeyExtent extent) throws IOException {
return getBulkFilesLoaded(credentials, extent.getMetadataEntry());
}
- public static Map<String,Long> getBulkFilesLoaded(TCredentials credentials, Text metadataRow) {
+ public static Map<FileRef,Long> getBulkFilesLoaded(TCredentials credentials, Text metadataRow) throws IOException {
- Map<String,Long> ret = new HashMap<String,Long>();
+ Map<FileRef,Long> ret = new HashMap<FileRef,Long>();
+ FileSystem fs = FileSystemImpl.get();
Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), credentials, Constants.METADATA_TABLE_ID, Constants.NO_AUTHS);
scanner.setRange(new Range(metadataRow));
scanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
for (Entry<Key,Value> entry : scanner) {
- String file = entry.getKey().getColumnQualifier().toString();
Long tid = Long.parseLong(entry.getValue().toString());
-
- ret.put(file, tid);
+ ret.put(new FileRef(fs, entry.getKey()), tid);
}
return ret;
}
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/OfflineMetadataScanner.java Wed Jun 5 17:19:25 2013
@@ -45,15 +45,15 @@ import org.apache.accumulo.core.iterator
import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.accumulo.core.iterators.system.VisibilityFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.util.MetadataTable.LogEntry;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class OfflineMetadataScanner extends ScannerOptions implements Scanner {
@@ -66,7 +66,8 @@ public class OfflineMetadataScanner exte
private List<SortedKeyValueIterator<Key,Value>> openMapFiles(Collection<String> files, FileSystem fs, AccumuloConfiguration conf) throws IOException {
List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<SortedKeyValueIterator<Key,Value>>();
for (String file : files) {
- FileSKVIterator reader = FileOperations.getInstance().openReader(file, true, fs, fs.getConf(), conf);
+ org.apache.hadoop.fs.FileSystem ns = fs.getFileSystemByPath(file);
+ FileSKVIterator reader = FileOperations.getInstance().openReader(file, true, ns, ns.getConf(), conf);
readers.add(reader);
}
return readers;
@@ -150,7 +151,7 @@ public class OfflineMetadataScanner exte
while (ssi.hasTop()) {
if (ssi.getTopKey().compareColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY) == 0) {
- allFiles.add(ServerConstants.getMetadataTableDir() + "/" + ssi.getTopKey().getColumnQualifier().toString());
+ allFiles.add(fs.getFullPath(ssi.getTopKey()));
} else {
walogs++;
}
@@ -255,8 +256,8 @@ public class OfflineMetadataScanner exte
}
public static void main(String[] args) throws IOException {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
ServerConfiguration conf = new ServerConfiguration(HdfsZooInstance.getInstance());
+ FileSystem fs = FileSystemImpl.get();
OfflineMetadataScanner scanner = new OfflineMetadataScanner(conf.getConfiguration(), fs);
scanner.setRange(Constants.METADATA_KEYSPACE);
for (Entry<Key,Value> entry : scanner)
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/RemoveEntriesForMissingFiles.java Wed Jun 5 17:19:25 2013
@@ -19,19 +19,17 @@ package org.apache.accumulo.server.util;
import java.util.Map.Entry;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.server.cli.ClientOpts;
import org.apache.accumulo.core.cli.BatchWriterOpts;
import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.server.ServerConstants;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.accumulo.server.cli.ClientOpts;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
@@ -54,7 +52,7 @@ public class RemoveEntriesForMissingFile
ScannerOpts scanOpts = new ScannerOpts();
BatchWriterOpts bwOpts = new BatchWriterOpts();
opts.parseArgs(RemoveEntriesForMissingFiles.class.getName(), args, scanOpts, bwOpts);
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ FileSystem fs = FileSystemImpl.get();
Connector connector = opts.getConnector();
Scanner metadata = connector.createScanner(Constants.METADATA_TABLE_NAME, opts.auths);
metadata.setBatchSize(scanOpts.scanBatchSize);
@@ -68,11 +66,7 @@ public class RemoveEntriesForMissingFile
for (Entry<Key,Value> entry : metadata) {
count++;
Key key = entry.getKey();
- String table = new String(KeyExtent.tableOfMetadataRow(entry.getKey().getRow()));
- String file = key.getColumnQualifier().toString();
- if (!file.startsWith("/"))
- file = "/" + file;
- Path map = new Path(ServerConstants.getTablesDir() + "/" + table + file);
+ Path map = new Path(fs.getFullPath(key));
if (!fs.exists(map)) {
missing++;
log.info("File " + map + " is missing");
Modified: accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java (original)
+++ accumulo/branches/ACCUMULO-118/server/src/main/java/org/apache/accumulo/server/util/TabletOperations.java Wed Jun 5 17:19:25 2013
@@ -17,12 +17,14 @@
package org.apache.accumulo.server.util;
import java.io.IOException;
+import java.util.Random;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
@@ -31,32 +33,37 @@ public class TabletOperations {
private static final Logger log = Logger.getLogger(TabletOperations.class);
- public static String createTabletDirectory(FileSystem fs, String tableDir, Text endRow) {
+ private static final Random random = new Random();
+
+ // TODO ACCUMULO-118 make the namespace selection pluggable
+ public static String createTabletDirectory(FileSystem fs, String tableId, Text endRow) {
String lowDirectory;
UniqueNameAllocator namer = UniqueNameAllocator.getInstance();
+ String[] tablesDirs = ServerConstants.getTablesDirs();
+ String randomNamespace = tablesDirs[random.nextInt(tablesDirs.length)];
while (true) {
try {
if (endRow == null) {
lowDirectory = Constants.DEFAULT_TABLET_LOCATION;
- Path lowDirectoryPath = new Path(tableDir + lowDirectory);
+ Path lowDirectoryPath = new Path(randomNamespace + "/" + tableId + "/" + lowDirectory);
if (fs.exists(lowDirectoryPath) || fs.mkdirs(lowDirectoryPath))
- return lowDirectory;
+ return lowDirectoryPath.makeQualified(fs.getFileSystemByPath(lowDirectoryPath)).toString();
log.warn("Failed to create " + lowDirectoryPath + " for unknown reason");
} else {
lowDirectory = "/" + Constants.GENERATED_TABLET_DIRECTORY_PREFIX + namer.getNextName();
- Path lowDirectoryPath = new Path(tableDir + lowDirectory);
+ Path lowDirectoryPath = new Path(randomNamespace + "/" + tableId + "/" + lowDirectory);
if (fs.exists(lowDirectoryPath))
throw new IllegalStateException("Dir exist when it should not " + lowDirectoryPath);
if (fs.mkdirs(lowDirectoryPath))
- return lowDirectory;
+ return lowDirectoryPath.makeQualified(fs.getFileSystemByPath(lowDirectoryPath)).toString();
}
} catch (IOException e) {
log.warn(e);
}
- log.warn("Failed to create dir for tablet in table " + tableDir + " will retry ...");
+ log.warn("Failed to create dir for tablet in table " + tableId + " in namespace " + randomNamespace + " + will retry ...");
UtilWaitThread.sleep(3000);
}
@@ -65,7 +72,7 @@ public class TabletOperations {
public static String createTabletDirectory(String tableDir, Text endRow) {
while (true) {
try {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
+ FileSystem fs = FileSystemImpl.get();
return createTabletDirectory(fs, tableDir, endRow);
} catch (IOException e) {
log.warn(e);
Modified: accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java (original)
+++ accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/continuous/ContinuousStatsCollector.java Wed Jun 5 17:19:25 2013
@@ -22,9 +22,7 @@ import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.server.cli.ClientOnRequiredTable;
import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
@@ -42,12 +40,14 @@ import org.apache.accumulo.core.master.t
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOnRequiredTable;
+import org.apache.accumulo.server.fs.FileSystem;
+import org.apache.accumulo.server.fs.FileSystemImpl;
import org.apache.accumulo.server.monitor.Monitor;
import org.apache.accumulo.server.security.SecurityConstants;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
@@ -110,15 +110,21 @@ public class ContinuousStatsCollector {
}
private String getFSStats() throws Exception {
- FileSystem fs = FileSystem.get(CachedConfiguration.getInstance());
- Path acudir = new Path(ServerConstants.getTablesDir());
- ContentSummary contentSummary = fs.getContentSummary(acudir);
-
- Path tableDir = new Path(ServerConstants.getTablesDir() + "/" + tableId);
- ContentSummary contentSummary2 = fs.getContentSummary(tableDir);
+ FileSystem fs = FileSystemImpl.get();
+ long length1 = 0, dcount1 = 0, fcount1 = 0;
+ long length2 = 0, dcount2 = 0, fcount2 = 0;
+ for (String dir : ServerConstants.getTablesDirs()) {
+ ContentSummary contentSummary = fs.getContentSummary(dir);
+ length1 += contentSummary.getLength();
+ dcount1 += contentSummary.getDirectoryCount();
+ fcount1 += contentSummary.getFileCount();
+ contentSummary = fs.getContentSummary(dir + "/" + tableId);
+ length2 += contentSummary.getLength();
+ dcount2 += contentSummary.getDirectoryCount();
+ fcount2 += contentSummary.getFileCount();
+ }
- return "" + contentSummary.getLength() + " " + contentSummary.getDirectoryCount() + " " + contentSummary.getFileCount() + " "
- + contentSummary2.getLength() + " " + contentSummary2.getDirectoryCount() + " " + contentSummary2.getFileCount();
+ return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2;
}
private String getACUStats() throws Exception {
Modified: accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java?rev=1489969&r1=1489968&r2=1489969&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java (original)
+++ accumulo/branches/ACCUMULO-118/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryTest.java Wed Jun 5 17:19:25 2013
@@ -35,7 +35,6 @@ import org.apache.accumulo.core.data.Key
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.FileUtil;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
@@ -45,11 +44,13 @@ import org.apache.accumulo.fate.zookeepe
import org.apache.accumulo.fate.zookeeper.ZooLock.LockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.master.state.Assignment;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.tabletserver.TabletServer;
import org.apache.accumulo.server.tabletserver.TabletTime;
+import org.apache.accumulo.server.util.FileUtil;
import org.apache.accumulo.server.util.MetadataTable;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.accumulo.server.zookeeper.ZooLock;
@@ -128,15 +129,15 @@ public class SplitRecoveryTest extends F
Text midRow = new Text(mr);
- SortedMap<String,DataFileValue> splitMapFiles = null;
+ SortedMap<FileRef,DataFileValue> splitMapFiles = null;
for (int i = 0; i < extents.length; i++) {
KeyExtent extent = extents[i];
String tdir = "/dir_" + i;
MetadataTable.addTablet(extent, tdir, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID, zl);
- SortedMap<String,DataFileValue> mapFiles = new TreeMap<String,DataFileValue>();
- mapFiles.put(tdir + "/" + RFile.EXTENSION + "_000_000", new DataFileValue(1000017 + i, 10000 + i));
+ SortedMap<FileRef,DataFileValue> mapFiles = new TreeMap<FileRef,DataFileValue>();
+ mapFiles.put(new FileRef(tdir + "/" + RFile.EXTENSION + "_000_000"), new DataFileValue(1000017 + i, 10000 + i));
if (i == extentToSplit) {
splitMapFiles = mapFiles;
@@ -154,14 +155,14 @@ public class SplitRecoveryTest extends F
splitPartiallyAndRecover(extent, high, low, .4, splitMapFiles, midRow, "localhost:1234", failPoint, zl);
}
- private void splitPartiallyAndRecover(KeyExtent extent, KeyExtent high, KeyExtent low, double splitRatio, SortedMap<String,DataFileValue> mapFiles,
+ private void splitPartiallyAndRecover(KeyExtent extent, KeyExtent high, KeyExtent low, double splitRatio, SortedMap<FileRef,DataFileValue> mapFiles,
Text midRow, String location, int steps, ZooLock zl) throws Exception {
- SortedMap<String,DataFileValue> lowDatafileSizes = new TreeMap<String,DataFileValue>();
- SortedMap<String,DataFileValue> highDatafileSizes = new TreeMap<String,DataFileValue>();
- List<String> highDatafilesToRemove = new ArrayList<String>();
+ SortedMap<FileRef,DataFileValue> lowDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ SortedMap<FileRef,DataFileValue> highDatafileSizes = new TreeMap<FileRef,DataFileValue>();
+ List<FileRef> highDatafilesToRemove = new ArrayList<FileRef>();
- MetadataTable.splitDatafiles(extent.getTableId(), midRow, splitRatio, new HashMap<String,FileUtil.FileInfo>(), mapFiles, lowDatafileSizes,
+ MetadataTable.splitDatafiles(extent.getTableId(), midRow, splitRatio, new HashMap<FileRef,FileUtil.FileInfo>(), mapFiles, lowDatafileSizes,
highDatafileSizes, highDatafilesToRemove);
MetadataTable.splitTablet(high, extent.getPrevEndRow(), splitRatio, SecurityConstants.getSystemCredentials(), zl);
@@ -173,7 +174,7 @@ public class SplitRecoveryTest extends F
writer.update(m);
if (steps >= 1) {
- Map<String,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
+ Map<FileRef,Long> bulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), extent);
MetadataTable.addNewTablet(low, "/lowDir", instance, lowDatafileSizes, bulkFiles, SecurityConstants.getSystemCredentials(), TabletTime.LOGICAL_TIME_ID
+ "0", -1l, -1l, zl);
}
@@ -187,8 +188,8 @@ public class SplitRecoveryTest extends F
ensureTabletHasNoUnexpectedMetadataEntries(low, lowDatafileSizes);
ensureTabletHasNoUnexpectedMetadataEntries(high, highDatafileSizes);
- Map<String,Long> lowBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), low);
- Map<String,Long> highBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), high);
+ Map<FileRef,Long> lowBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), low);
+ Map<FileRef,Long> highBulkFiles = MetadataTable.getBulkFilesLoaded(SecurityConstants.getSystemCredentials(), high);
if (!lowBulkFiles.equals(highBulkFiles)) {
throw new Exception(" " + lowBulkFiles + " != " + highBulkFiles + " " + low + " " + high);
@@ -202,7 +203,7 @@ public class SplitRecoveryTest extends F
}
}
- private void ensureTabletHasNoUnexpectedMetadataEntries(KeyExtent extent, SortedMap<String,DataFileValue> expectedMapFiles) throws Exception {
+ private void ensureTabletHasNoUnexpectedMetadataEntries(KeyExtent extent, SortedMap<FileRef,DataFileValue> expectedMapFiles) throws Exception {
Scanner scanner = new ScannerImpl(HdfsZooInstance.getInstance(), SecurityConstants.getSystemCredentials(), Constants.METADATA_TABLE_ID,
Constants.NO_AUTHS);
scanner.setRange(extent.toMetadataRange());
@@ -243,17 +244,17 @@ public class SplitRecoveryTest extends F
throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns);
}
- SortedMap<String,DataFileValue> fixedMapFiles = MetadataTable.getDataFileSizes(extent, SecurityConstants.getSystemCredentials());
+ SortedMap<FileRef,DataFileValue> fixedMapFiles = MetadataTable.getDataFileSizes(extent, SecurityConstants.getSystemCredentials());
verifySame(expectedMapFiles, fixedMapFiles);
}
- private void verifySame(SortedMap<String,DataFileValue> datafileSizes, SortedMap<String,DataFileValue> fixedDatafileSizes) throws Exception {
+ private void verifySame(SortedMap<FileRef,DataFileValue> datafileSizes, SortedMap<FileRef,DataFileValue> fixedDatafileSizes) throws Exception {
if (!datafileSizes.keySet().containsAll(fixedDatafileSizes.keySet()) || !fixedDatafileSizes.keySet().containsAll(datafileSizes.keySet())) {
throw new Exception("Key sets not the same " + datafileSizes.keySet() + " != " + fixedDatafileSizes.keySet());
}
- for (Entry<String,DataFileValue> entry : datafileSizes.entrySet()) {
+ for (Entry<FileRef,DataFileValue> entry : datafileSizes.entrySet()) {
DataFileValue dfv = entry.getValue();
DataFileValue otherDfv = fixedDatafileSizes.get(entry.getKey());