You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2014/01/18 02:06:03 UTC
git commit: ACCUMULO-2173 use suffixes when comparing paths
Updated Branches:
refs/heads/1.6.0-SNAPSHOT cac60938f -> 71b2baec7
ACCUMULO-2173 use suffixes when comparing paths
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/71b2baec
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/71b2baec
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/71b2baec
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 71b2baec7dd3df7fc8d3b2e1a6e57c0bb9c7a258
Parents: cac6093
Author: Keith Turner <kt...@apache.org>
Authored: Fri Jan 17 20:03:10 2014 -0500
Committer: Keith Turner <kt...@apache.org>
Committed: Fri Jan 17 20:04:35 2014 -0500
----------------------------------------------------------------------
.../org/apache/accumulo/core/file/FileUtil.java | 436 -------------------
.../org/apache/accumulo/server/fs/FileRef.java | 34 +-
.../apache/accumulo/server/util/FileUtil.java | 46 +-
.../apache/accumulo/server/fs/FileRefTest.java | 104 +++++
.../org/apache/accumulo/tserver/Tablet.java | 10 +-
.../tserver/TabletIteratorEnvironment.java | 2 +-
.../accumulo/tserver/log/SortedLogRecovery.java | 15 +-
.../DefaultCompactionStrategyTest.java | 5 +-
.../SizeLimitCompactionStrategyTest.java | 2 +-
.../tserver/log/SortedLogRecoveryTest.java | 98 ++++-
.../java/org/apache/accumulo/test/VolumeIT.java | 113 +++++
11 files changed, 371 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java b/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
index b06a9bf..0ee16cf 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileUtil.java
@@ -16,39 +16,22 @@
*/
package org.apache.accumulo.core.file;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.file.rfile.RFile;
-import org.apache.accumulo.core.file.rfile.RFileOperations;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.iterators.system.MultiIterator;
-import org.apache.accumulo.core.util.CachedConfiguration;
-import org.apache.accumulo.core.util.LocalityGroupUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
public class FileUtil {
@@ -73,425 +56,6 @@ public class FileUtil {
private static final Logger log = Logger.getLogger(FileUtil.class);
- private static String createTmpDir(AccumuloConfiguration acuConf, FileSystem fs) throws IOException {
- String accumuloDir = acuConf.get(Property.INSTANCE_DFS_DIR);
- Random random = new Random();
- String tmpDir = null;
- while (tmpDir == null) {
- tmpDir = accumuloDir + "/tmp/idxReduce_" + String.format("%09d", random.nextInt(Integer.MAX_VALUE));
-
- try {
- fs.getFileStatus(new Path(tmpDir));
- tmpDir = null;
- continue;
- } catch (FileNotFoundException fne) {
- // found an unused temp directory
- }
-
- fs.mkdirs(new Path(tmpDir));
-
- // try to reserve the tmp dir
- if (!fs.createNewFile(new Path(tmpDir + "/__reserve")))
- tmpDir = null;
- }
-
- return tmpDir;
- }
-
- public static Collection<String> reduceFiles(AccumuloConfiguration acuConf, Configuration conf, FileSystem fs, Text prevEndRow, Text endRow,
- Collection<String> mapFiles, int maxFiles, String tmpDir, int pass) throws IOException {
- ArrayList<String> paths = new ArrayList<String>(mapFiles);
-
- if (paths.size() <= maxFiles)
- return paths;
-
- String newDir = String.format("%s/pass_%04d", tmpDir, pass);
-
- int start = 0;
-
- ArrayList<String> outFiles = new ArrayList<String>();
-
- int count = 0;
-
- while (start < paths.size()) {
- int end = Math.min(maxFiles + start, paths.size());
- List<String> inFiles = paths.subList(start, end);
-
- start = end;
-
- String newMapFile = String.format("%s/%04d." + RFile.EXTENSION, newDir, count++);
-
- outFiles.add(newMapFile);
-
- FileSKVWriter writer = new RFileOperations().openWriter(newMapFile, fs, conf, acuConf);
- writer.startDefaultLocalityGroup();
- List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
-
- FileSKVIterator reader = null;
- try {
- for (String s : inFiles) {
- reader = FileOperations.getInstance().openIndex(s, fs, conf, acuConf);
- iters.add(reader);
- }
-
- MultiIterator mmfi = new MultiIterator(iters, true);
-
- while (mmfi.hasTop()) {
- Key key = mmfi.getTopKey();
-
- boolean gtPrevEndRow = prevEndRow == null || key.compareRow(prevEndRow) > 0;
- boolean lteEndRow = endRow == null || key.compareRow(endRow) <= 0;
-
- if (gtPrevEndRow && lteEndRow)
- writer.append(key, new Value(new byte[0]));
-
- if (!lteEndRow)
- break;
-
- mmfi.next();
- }
- } finally {
- try {
- if (reader != null)
- reader.close();
- } catch (IOException e) {
- log.error(e, e);
- }
-
- for (SortedKeyValueIterator<Key,Value> r : iters)
- try {
- if (r != null)
- ((FileSKVIterator) r).close();
- } catch (IOException e) {
- // continue closing
- log.error(e, e);
- }
-
- try {
- if (writer != null)
- writer.close();
- } catch (IOException e) {
- log.error(e, e);
- throw e;
- }
- }
- }
-
- return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1);
- }
-
- public static SortedMap<Double,Key> findMidPoint(FileSystem fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
- double minSplit) throws IOException {
- return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);
- }
-
- public static double estimatePercentageLTE(FileSystem fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
- Text splitRow) throws IOException {
-
- Configuration conf = CachedConfiguration.getInstance();
-
- String tmpDir = null;
-
- int maxToOpen = acuconf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
- ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(mapFiles.size());
-
- try {
- if (mapFiles.size() > maxToOpen) {
- tmpDir = createTmpDir(acuconf, fs);
-
- log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
-
- long t1 = System.currentTimeMillis();
- mapFiles = reduceFiles(acuconf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
- long t2 = System.currentTimeMillis();
-
- log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
- }
-
- if (prevEndRow == null)
- prevEndRow = new Text();
-
- long numKeys = 0;
-
- numKeys = countIndexEntries(acuconf, prevEndRow, endRow, mapFiles, true, conf, fs, readers);
-
- if (numKeys == 0) {
- // not enough info in the index to answer the question, so instead of going to
- // the data just punt and return .5
- return .5;
- }
-
- List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
- MultiIterator mmfi = new MultiIterator(iters, true);
-
- // skip the prevendrow
- while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0) {
- mmfi.next();
- }
-
- int numLte = 0;
-
- while (mmfi.hasTop() && mmfi.getTopKey().compareRow(splitRow) <= 0) {
- numLte++;
- mmfi.next();
- }
-
- if (numLte > numKeys) {
- // something went wrong
- throw new RuntimeException("numLte > numKeys " + numLte + " " + numKeys + " " + prevEndRow + " " + endRow + " " + splitRow + " " + mapFiles);
- }
-
- // do not want to return 0% or 100%, so add 1 and 2 below
- return (numLte + 1) / (double) (numKeys + 2);
-
- } finally {
- cleanupIndexOp(acuconf, tmpDir, fs, readers);
- }
- }
-
- /**
- *
- * @param mapFiles
- * - list MapFiles to find the mid point key
- *
- * ISSUES : This method used the index files to find the mid point. If the map files have different index intervals this method will not return an
- * accurate mid point. Also, it would be tricky to use this method in conjunction with an in memory map because the indexing interval is unknown.
- */
- public static SortedMap<Double,Key> findMidPoint(FileSystem fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
- double minSplit, boolean useIndex) throws IOException {
- Configuration conf = CachedConfiguration.getInstance();
-
- Collection<String> origMapFiles = mapFiles;
-
- String tmpDir = null;
-
- int maxToOpen = acuConf.getCount(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN);
- ArrayList<FileSKVIterator> readers = new ArrayList<FileSKVIterator>(mapFiles.size());
-
- try {
- if (mapFiles.size() > maxToOpen) {
- if (!useIndex)
- throw new IOException("Cannot find mid point using data files, too many " + mapFiles.size());
- tmpDir = createTmpDir(acuConf, fs);
-
- log.debug("Too many indexes (" + mapFiles.size() + ") to open at once for " + endRow + " " + prevEndRow + ", reducing in tmpDir = " + tmpDir);
-
- long t1 = System.currentTimeMillis();
- mapFiles = reduceFiles(acuConf, conf, fs, prevEndRow, endRow, mapFiles, maxToOpen, tmpDir, 0);
- long t2 = System.currentTimeMillis();
-
- log.debug("Finished reducing indexes for " + endRow + " " + prevEndRow + " in " + String.format("%6.2f secs", (t2 - t1) / 1000.0));
- }
-
- if (prevEndRow == null)
- prevEndRow = new Text();
-
- long t1 = System.currentTimeMillis();
-
- long numKeys = 0;
-
- numKeys = countIndexEntries(acuConf, prevEndRow, endRow, mapFiles, tmpDir == null ? useIndex : false, conf, fs, readers);
-
- if (numKeys == 0) {
- if (useIndex) {
- log.warn("Failed to find mid point using indexes, falling back to data files which is slower. No entries between " + prevEndRow + " and " + endRow
- + " for " + mapFiles);
- // need to pass original map files, not possibly reduced indexes
- return findMidPoint(fs, acuConf, prevEndRow, endRow, origMapFiles, minSplit, false);
- }
- throw new IOException("Failed to find mid point, no entries between " + prevEndRow + " and " + endRow + " for " + mapFiles);
- }
-
- List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(readers);
- MultiIterator mmfi = new MultiIterator(iters, true);
-
- // skip the prevendrow
- while (mmfi.hasTop() && mmfi.getTopKey().compareRow(prevEndRow) <= 0)
- mmfi.next();
-
- // read half of the keys in the index
- TreeMap<Double,Key> ret = new TreeMap<Double,Key>();
- Key lastKey = null;
- long keysRead = 0;
-
- Key keyBeforeMidPoint = null;
- long keyBeforeMidPointPosition = 0;
-
- while (keysRead < numKeys / 2) {
- if (lastKey != null && !lastKey.equals(mmfi.getTopKey(), PartialKey.ROW) && (keysRead - 1) / (double) numKeys >= minSplit) {
- keyBeforeMidPoint = new Key(lastKey);
- keyBeforeMidPointPosition = keysRead - 1;
- }
-
- if (lastKey == null)
- lastKey = new Key();
-
- lastKey.set(mmfi.getTopKey());
-
- keysRead++;
-
- // consume minimum
- mmfi.next();
- }
-
- if (keyBeforeMidPoint != null)
- ret.put(keyBeforeMidPointPosition / (double) numKeys, keyBeforeMidPoint);
-
- long t2 = System.currentTimeMillis();
-
- log.debug(String.format("Found midPoint from indexes in %6.2f secs.%n", (t2 - t1) / 1000.0));
-
- ret.put(.5, mmfi.getTopKey());
-
- // sanity check
- for (Key key : ret.values()) {
- boolean inRange = (key.compareRow(prevEndRow) > 0 && (endRow == null || key.compareRow(endRow) < 1));
- if (!inRange) {
- throw new IOException("Found mid point is not in range " + key + " " + prevEndRow + " " + endRow + " " + mapFiles);
- }
- }
-
- return ret;
- } finally {
- cleanupIndexOp(acuConf, tmpDir, fs, readers);
- }
- }
-
- private static void cleanupIndexOp(AccumuloConfiguration acuConf, String tmpDir, FileSystem fs, ArrayList<FileSKVIterator> readers) throws IOException {
- // close all of the index sequence files
- for (FileSKVIterator r : readers) {
- try {
- if (r != null)
- r.close();
- } catch (IOException e) {
- // okay, try to close the rest anyway
- log.error(e, e);
- }
- }
-
- if (tmpDir != null) {
- String tmpPrefix = acuConf.get(Property.INSTANCE_DFS_DIR) + "/tmp";
- if (tmpDir.startsWith(tmpPrefix))
- fs.delete(new Path(tmpDir), true);
- else
- log.error("Did not delete tmp dir because it wasn't a tmp dir " + tmpDir);
- }
- }
-
- private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles, boolean useIndex,
- Configuration conf, FileSystem fs, ArrayList<FileSKVIterator> readers) throws IOException {
-
- long numKeys = 0;
-
- // count the total number of index entries
- for (String path : mapFiles) {
- FileSKVIterator reader = null;
- try {
- if (useIndex)
- reader = FileOperations.getInstance().openIndex(path, fs, conf, acuConf);
- else
- reader = FileOperations.getInstance().openReader(path, new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, fs, conf,
- acuConf);
-
- while (reader.hasTop()) {
- Key key = reader.getTopKey();
- if (endRow != null && key.compareRow(endRow) > 0)
- break;
- else if (prevEndRow == null || key.compareRow(prevEndRow) > 0)
- numKeys++;
-
- reader.next();
- }
- } finally {
- try {
- if (reader != null)
- reader.close();
- } catch (IOException e) {
- log.error(e, e);
- }
- }
-
- if (useIndex)
- readers.add(FileOperations.getInstance().openIndex(path, fs, conf, acuConf));
- else
- readers.add(FileOperations.getInstance().openReader(path, new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false, fs, conf,
- acuConf));
-
- }
- return numKeys;
- }
-
- public static Map<String,FileInfo> tryToGetFirstAndLastRows(FileSystem fs, AccumuloConfiguration acuConf, Set<String> mapfiles) {
-
- HashMap<String,FileInfo> mapFilesInfo = new HashMap<String,FileInfo>();
-
- Configuration conf = CachedConfiguration.getInstance();
-
- long t1 = System.currentTimeMillis();
-
- for (String mapfile : mapfiles) {
-
- FileSKVIterator reader = null;
- try {
- reader = FileOperations.getInstance().openReader(mapfile, false, fs, conf, acuConf);
-
- Key firstKey = reader.getFirstKey();
- if (firstKey != null) {
- mapFilesInfo.put(mapfile, new FileInfo(firstKey, reader.getLastKey()));
- }
-
- } catch (IOException ioe) {
- log.warn("Failed to read map file to determine first and last key : " + mapfile, ioe);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException ioe) {
- log.warn("failed to close " + mapfile, ioe);
- }
- }
- }
-
- }
-
- long t2 = System.currentTimeMillis();
-
- log.debug(String.format("Found first and last keys for %d map files in %6.2f secs", mapfiles.size(), (t2 - t1) / 1000.0));
-
- return mapFilesInfo;
- }
-
- public static WritableComparable<Key> findLastKey(FileSystem fs, AccumuloConfiguration acuConf, Collection<String> mapFiles) throws IOException {
- Key lastKey = null;
-
- Configuration conf = CachedConfiguration.getInstance();
-
- for (String path : mapFiles) {
- FileSKVIterator reader = FileOperations.getInstance().openReader(path, true, fs, conf, acuConf);
-
- try {
- if (!reader.hasTop())
- // file is empty, so there is no last key
- continue;
-
- Key key = reader.getLastKey();
-
- if (lastKey == null || key.compareTo(lastKey) > 0)
- lastKey = key;
- } finally {
- try {
- if (reader != null)
- reader.close();
- } catch (IOException e) {
- log.error(e, e);
- }
- }
- }
-
- return lastKey;
-
- }
-
private static class MLong {
public MLong(long i) {
l = i;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
index dade501..c0bb275 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java
@@ -17,6 +17,7 @@
package org.apache.accumulo.server.fs;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.server.fs.VolumeManager.FileType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -28,22 +29,22 @@ import org.apache.hadoop.io.Text;
* metadata tables.
*/
public class FileRef implements Comparable<FileRef> {
- String metaReference; // something like ../2/d-00000/A00001.rf
- Path fullReference; // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+ private String metaReference; // something like ../2/d-00000/A00001.rf
+ private Path fullReference; // something like hdfs://nn:9001/accumulo/tables/2/d-00000/A00001.rf
+ private Path suffix;
public FileRef(VolumeManager fs, Key key) {
- metaReference = key.getColumnQualifier().toString();
- fullReference = fs.getFullPath(key);
+ this(key.getColumnQualifier().toString(), fs.getFullPath(key));
}
public FileRef(String metaReference, Path fullReference) {
this.metaReference = metaReference;
this.fullReference = fullReference;
+ this.suffix = extractSuffix(fullReference);
}
public FileRef(String path) {
- this.metaReference = path;
- this.fullReference = new Path(path);
+ this(path, new Path(path));
}
public String toString() {
@@ -58,14 +59,31 @@ public class FileRef implements Comparable<FileRef> {
return new Text(metaReference);
}
+ static Path extractSuffix(Path path) {
+ String pstr = path.toString();
+ int index = pstr.lastIndexOf(FileType.TABLE.getDirectory());
+ if (index < 0)
+ throw new IllegalArgumentException("Invalid table path " + pstr);
+
+ try {
+ Path parent = path.getParent().getParent();
+ if (!parent.getName().equals(FileType.TABLE.getDirectory()) && !parent.getParent().getName().equals(FileType.TABLE.getDirectory()))
+ throw new IllegalArgumentException("Invalid table path " + pstr);
+ } catch (NullPointerException npe) {
+ throw new IllegalArgumentException("Invalid table path " + pstr);
+ }
+
+ return new Path(pstr.substring(index + FileType.TABLE.getDirectory().length() + 1));
+ }
+
@Override
public int compareTo(FileRef o) {
- return path().compareTo(o.path());
+ return suffix.compareTo(o.suffix);
}
@Override
public int hashCode() {
- return path().hashCode();
+ return suffix.hashCode();
}
@Override
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index b410dc1..8e38cbd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -81,7 +82,7 @@ public class FileUtil {
Path result = null;
while (result == null) {
- result = new Path(accumuloDir + "/tmp/idxReduce_" + String.format("%09d", (int) (Math.random() * Integer.MAX_VALUE)));
+ result = new Path(accumuloDir + "/tmp/idxReduce_" + String.format("%09d", new Random().nextInt(Integer.MAX_VALUE)));
try {
fs.getFileStatus(result);
@@ -100,9 +101,9 @@ public class FileUtil {
return result;
}
- public static Collection<FileRef> reduceFiles(AccumuloConfiguration acuConf, Configuration conf, VolumeManager fs, Text prevEndRow, Text endRow,
- Collection<FileRef> mapFiles, int maxFiles, Path tmpDir, int pass) throws IOException {
- ArrayList<FileRef> paths = new ArrayList<FileRef>(mapFiles);
+ public static Collection<String> reduceFiles(AccumuloConfiguration acuConf, Configuration conf, VolumeManager fs, Text prevEndRow, Text endRow,
+ Collection<String> mapFiles, int maxFiles, Path tmpDir, int pass) throws IOException {
+ ArrayList<String> paths = new ArrayList<String>(mapFiles);
if (paths.size() <= maxFiles)
return paths;
@@ -111,29 +112,29 @@ public class FileUtil {
int start = 0;
- ArrayList<FileRef> outFiles = new ArrayList<FileRef>();
+ ArrayList<String> outFiles = new ArrayList<String>();
int count = 0;
while (start < paths.size()) {
int end = Math.min(maxFiles + start, paths.size());
- List<FileRef> inFiles = paths.subList(start, end);
+ List<String> inFiles = paths.subList(start, end);
start = end;
- FileRef newMapFile = new FileRef(String.format("%s/%04d.%s", newDir, count++, RFile.EXTENSION));
+ String newMapFile = String.format("%s/%04d.%s", newDir, count++, RFile.EXTENSION);
outFiles.add(newMapFile);
- FileSystem ns = fs.getFileSystemByPath(newMapFile.path());
+ FileSystem ns = fs.getFileSystemByPath(new Path(newMapFile));
FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), acuConf);
writer.startDefaultLocalityGroup();
List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
FileSKVIterator reader = null;
try {
- for (FileRef s : inFiles) {
- ns = fs.getFileSystemByPath(s.path());
- reader = FileOperations.getInstance().openIndex(s.path().toString(), ns, ns.getConf(), acuConf);
+ for (String s : inFiles) {
+ ns = fs.getFileSystemByPath(new Path(s));
+ reader = FileOperations.getInstance().openIndex(s, ns, ns.getConf(), acuConf);
iters.add(reader);
}
@@ -182,12 +183,12 @@ public class FileUtil {
return reduceFiles(acuConf, conf, fs, prevEndRow, endRow, outFiles, maxFiles, tmpDir, pass + 1);
}
- public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+ public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
double minSplit) throws IOException {
return findMidPoint(fs, acuConf, prevEndRow, endRow, mapFiles, minSplit, true);
}
- public static double estimatePercentageLTE(VolumeManager fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+ public static double estimatePercentageLTE(VolumeManager fs, AccumuloConfiguration acuconf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
Text splitRow) throws IOException {
Configuration conf = CachedConfiguration.getInstance();
@@ -259,11 +260,11 @@ public class FileUtil {
* ISSUES : This method used the index files to find the mid point. If the map files have different index intervals this method will not return an
* accurate mid point. Also, it would be tricky to use this method in conjunction with an in memory map because the indexing interval is unknown.
*/
- public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles,
+ public static SortedMap<Double,Key> findMidPoint(VolumeManager fs, AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles,
double minSplit, boolean useIndex) throws IOException {
Configuration conf = CachedConfiguration.getInstance();
- Collection<FileRef> origMapFiles = mapFiles;
+ Collection<String> origMapFiles = mapFiles;
Path tmpDir = null;
@@ -380,15 +381,15 @@ public class FileUtil {
}
}
- private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<FileRef> mapFiles, boolean useIndex,
+ private static long countIndexEntries(AccumuloConfiguration acuConf, Text prevEndRow, Text endRow, Collection<String> mapFiles, boolean useIndex,
Configuration conf, VolumeManager fs, ArrayList<FileSKVIterator> readers) throws IOException {
long numKeys = 0;
// count the total number of index entries
- for (FileRef ref : mapFiles) {
+ for (String ref : mapFiles) {
FileSKVIterator reader = null;
- Path path = ref.path();
+ Path path = new Path(ref);
FileSystem ns = fs.getFileSystemByPath(path);
try {
if (useIndex)
@@ -548,4 +549,13 @@ public class FileUtil {
return results;
}
+ public static Collection<String> toPathStrings(Collection<FileRef> refs) {
+ ArrayList<String> ret = new ArrayList<String>();
+ for (FileRef fileRef : refs) {
+ ret.add(fileRef.path().toString());
+ }
+
+ return ret;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/base/src/test/java/org/apache/accumulo/server/fs/FileRefTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/FileRefTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/FileRefTest.java
new file mode 100644
index 0000000..4ad0ea3
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/FileRefTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class FileRefTest{
+
+ private void testBadTableSuffix(String badPath) {
+ try {
+ FileRef.extractSuffix(new Path(badPath));
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertTrue(e.getMessage().contains(badPath));
+ }
+ }
+
+ @Test
+ public void testSuffixes() {
+ Assert.assertEquals(new Path("2a/t-0003/C0004.rf"), FileRef.extractSuffix(new Path("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf")));
+ Assert.assertEquals(new Path("2a/t-0003/C0004.rf"), FileRef.extractSuffix(new Path("hdfs://nn1/accumulo/tables/2a/t-0003//C0004.rf")));
+ Assert.assertEquals(new Path("2a/t-0003"), FileRef.extractSuffix(new Path("hdfs://nn1/accumulo/tables/2a/t-0003")));
+ Assert.assertEquals(new Path("2a/t-0003"), FileRef.extractSuffix(new Path("hdfs://nn1/accumulo/tables/2a/t-0003/")));
+
+ testBadTableSuffix("t-0003/C0004.rf");
+ testBadTableSuffix("../t-0003/C0004.rf");
+ testBadTableSuffix("2a/t-0003");
+ testBadTableSuffix("2a/t-0003/C0004.rf");
+ testBadTableSuffix("2a/t-0003/C0004.rf");
+ testBadTableSuffix("hdfs://nn3/accumulo/2a/t-0003/C0004.rf");
+ testBadTableSuffix("hdfs://nn3/accumulo/2a/t-0003");
+ testBadTableSuffix("hdfs://nn3/tables/accumulo/2a/t-0003/C0004.rf");
+ }
+
+ @Test
+ public void testEqualsAndHash() {
+ Assert.assertEquals(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf"), new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf"));
+ Assert.assertEquals(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf"), new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf"));
+ Assert.assertNotEquals(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0005.rf"), new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf"));
+ Assert.assertNotEquals(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0005.rf"), new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf"));
+
+ HashMap<FileRef,String> refMap = new HashMap<FileRef,String>();
+ refMap.put(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf"), "7");
+ refMap.put(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0005.rf"), "8");
+
+ Assert.assertNull(refMap.get("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0006.rf"));
+
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf")), "7");
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf")), "7");
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://1.2.3.4//accumulo/tables/2a//t-0003//C0004.rf")), "7");
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://nn1/accumulo/tables/2a//t-0003//C0004.rf")), "7");
+
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0005.rf")), "8");
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0005.rf")), "8");
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a//t-0003/C0005.rf")), "8");
+ Assert.assertEquals(refMap.get(new FileRef("hdfs://nn1/accumulo/tables//2a/t-0003/C0005.rf")), "8");
+ }
+
+ @Test
+ public void testCompareTo() {
+ Assert.assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf").compareTo(new FileRef(
+ "hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf")) == 0);
+ Assert
+ .assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf").compareTo(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0004.rf")) == 0);
+ Assert.assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf")
+ .compareTo(new FileRef("hdfs://nn1/accumulo/tables//2a/t-0003//C0004.rf")) == 0);
+
+ Assert.assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf").compareTo(new FileRef(
+ "hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0005.rf")) < 0);
+ Assert
+ .assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf").compareTo(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0005.rf")) < 0);
+ Assert.assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0004.rf")
+ .compareTo(new FileRef("hdfs://nn1/accumulo/tables//2a/t-0003//C0005.rf")) < 0);
+
+ Assert.assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0006.rf").compareTo(new FileRef(
+ "hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0005.rf")) > 0);
+ Assert
+ .assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0006.rf").compareTo(new FileRef("hdfs://nn1/accumulo/tables/2a/t-0003/C0005.rf")) > 0);
+ Assert.assertTrue(new FileRef("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/C0006.rf")
+ .compareTo(new FileRef("hdfs://nn1/accumulo/tables//2a/t-0003//C0005.rf")) > 0);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index c4a2770..e457ed5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1159,7 +1159,7 @@ public class Tablet {
break;
}
- FileRef ref = new FileRef(entry.getKey().getColumnQualifier().toString(), fs.getFullPath(entry.getKey()));
+ FileRef ref = new FileRef(fs, entry.getKey());
datafiles.put(ref, new DataFileValue(entry.getValue().get()));
}
}
@@ -1199,9 +1199,7 @@ public class Tablet {
for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) {
Key key = entry.getKey();
if (key.getRow().equals(row) && key.getColumnFamily().equals(ScanFileColumnFamily.NAME)) {
- String meta = key.getColumnQualifier().toString();
- Path path = fs.getFullPath(extent.getTableId().toString(), meta);
- scanFiles.add(new FileRef(meta, path));
+ scanFiles.add(new FileRef(fs, key));
}
}
@@ -2974,7 +2972,7 @@ public class Tablet {
try {
// we should make .25 below configurable
- keys = FileUtil.findMidPoint(fs, tabletServer.getSystemConfiguration(), extent.getPrevEndRow(), extent.getEndRow(), files, .25);
+ keys = FileUtil.findMidPoint(fs, tabletServer.getSystemConfiguration(), extent.getPrevEndRow(), extent.getEndRow(), FileUtil.toPathStrings(files), .25);
} catch (IOException e) {
log.error("Failed to find midpoint " + e.getMessage());
return null;
@@ -3531,7 +3529,7 @@ public class Tablet {
else {
Text tsp = new Text(sp);
splitPoint = new SplitRowSpec(FileUtil.estimatePercentageLTE(fs, tabletServer.getSystemConfiguration(), extent.getPrevEndRow(), extent.getEndRow(),
- datafileManager.getFiles(), tsp), tsp);
+ FileUtil.toPathStrings(datafileManager.getFiles()), tsp), tsp);
}
if (splitPoint == null || splitPoint.row == null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
index 5d2c850..e13594d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletIteratorEnvironment.java
@@ -25,8 +25,8 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
-import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.system.MultiIterator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.server.fs.FileRef;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index 612f8ee..2c6d415 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -24,6 +24,7 @@ import static org.apache.accumulo.tserver.logger.LogEvents.MUTATION;
import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -136,7 +137,19 @@ public class SortedLogRecovery {
}
}
+ private String getPathSuffix(String pathString) {
+ Path path = new Path(pathString);
+ if (path.depth() < 2)
+ throw new IllegalArgumentException("Bad path " + pathString);
+ return path.getParent().getName() + "/" + path.getName();
+ }
+
int findLastStartToFinish(MultiReader reader, int fileno, KeyExtent extent, Set<String> tabletFiles, LastStartToFinish lastStartToFinish) throws IOException, EmptyMapFileException, UnusedException {
+
+ HashSet<String> suffixes = new HashSet<String>();
+ for (String path : tabletFiles)
+ suffixes.add(getPathSuffix(path));
+
// Scan for tableId for this extent (should always be in the log)
LogFileKey key = new LogFileKey();
LogFileValue value = new LogFileValue();
@@ -195,7 +208,7 @@ public class SortedLogRecovery {
// Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table.
log.debug("minor compaction into " + key.filename + " finished, but was still in the METADATA");
- if (tabletFiles.contains(key.filename))
+ if (suffixes.contains(getPathSuffix(key.filename)))
lastStartToFinish.update(-1);
} else if (key.event == COMPACTION_FINISH) {
if (key.seq <= lastStartToFinish.lastStart)
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
index cdb0ad5..6414a03 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/DefaultCompactionStrategyTest.java
@@ -157,7 +157,7 @@ public class DefaultCompactionStrategyTest {
private MajorCompactionRequest createRequest(KeyExtent extent, MajorCompactionReason reason, Object... objs) throws IOException {
Map<FileRef,DataFileValue> files = new HashMap<FileRef,DataFileValue>();
for (int i = 0; i < objs.length; i += 2) {
- files.put(new FileRef((String) objs[i]), new DataFileValue(((Number) objs[i + 1]).longValue(), 0));
+ files.put(new FileRef("hdfs://nn1/accumulo/tables/5/t-0001/" + (String) objs[i]), new DataFileValue(((Number) objs[i + 1]).longValue(), 0));
}
return new TestCompactionRequest(extent, reason, files);
}
@@ -176,7 +176,8 @@ public class DefaultCompactionStrategyTest {
private static Set<String> asSet(Collection<String> strings) {
HashSet<String> result = new HashSet<String>();
- result.addAll(strings);
+ for (String string : strings)
+ result.add("hdfs://nn1/accumulo/tables/5/t-0001/" + string);
return result;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
index 128777d..abab34f 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/compaction/SizeLimitCompactionStrategyTest.java
@@ -37,7 +37,7 @@ public class SizeLimitCompactionStrategyTest {
HashMap<FileRef,DataFileValue> ret = new HashMap<FileRef,DataFileValue>();
for (int i = 0; i < sa.length; i += 2) {
- ret.put(new FileRef(sa[i]), new DataFileValue(AccumuloConfiguration.getMemoryInBytes(sa[i + 1]), 1));
+ ret.put(new FileRef("hdfs://nn1/accumulo/tables/5/t-0001/" + sa[i]), new DataFileValue(AccumuloConfiguration.getMemoryInBytes(sa[i + 1]), 1));
}
return ret;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index f335ad8..359bfa1 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -153,16 +153,16 @@ public class SortedLogRecoveryTest {
Mutation m2 = new ServerMutation(new Text("row2"));
m2.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
+ createKeyValue(COMPACTION_START, 3, 1, "/t1/f1"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 4, 1, "somefile"), createKeyValue(MUTATION, 7, 1, m),};
+ createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"), createKeyValue(MUTATION, 7, 1, m),};
KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, 2, "23"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
- createKeyValue(COMPACTION_START, 5, 2, "newfile"), createKeyValue(COMPACTION_FINISH, 6, 2, null), createKeyValue(MUTATION, 3, 2, ignored),
+ createKeyValue(COMPACTION_START, 5, 2, "/t1/f2"), createKeyValue(COMPACTION_FINISH, 6, 2, null), createKeyValue(MUTATION, 3, 2, ignored),
createKeyValue(MUTATION, 4, 2, ignored),};
KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 0, 3, "69"), createKeyValue(DEFINE_TABLET, 1, 3, extent),
createKeyValue(MUTATION, 2, 3, ignored), createKeyValue(MUTATION, 3, 3, ignored), createKeyValue(MUTATION, 4, 3, ignored),};
KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, 4, "70"), createKeyValue(DEFINE_TABLET, 1, 4, extent),
- createKeyValue(COMPACTION_START, 3, 4, "thisfile"), createKeyValue(MUTATION, 2, 4, ignored), createKeyValue(MUTATION, 6, 4, m2),};
+ createKeyValue(COMPACTION_START, 3, 4, "/t1/f3"), createKeyValue(MUTATION, 2, 4, ignored), createKeyValue(MUTATION, 6, 4, m2),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("entries", entries);
@@ -194,15 +194,16 @@ public class SortedLogRecoveryTest {
Mutation m4 = new ServerMutation(new Text("row4"));
m4.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
+ createKeyValue(COMPACTION_START, 3, 1, "/t1/f1"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, ignored),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "2"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
createKeyValue(MUTATION, 7, 1, ignored),};
KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 8, -1, "3"), createKeyValue(DEFINE_TABLET, 9, 1, extent),
- createKeyValue(COMPACTION_FINISH, 10, 1, null), createKeyValue(COMPACTION_START, 12, 1, "newfile"), createKeyValue(COMPACTION_FINISH, 13, 1, null),
+ createKeyValue(COMPACTION_FINISH, 10, 1, "/t1/f1"), createKeyValue(COMPACTION_START, 12, 1, "/t1/f2"),
+ createKeyValue(COMPACTION_FINISH, 13, 1, "/t1/f2"),
// createKeyValue(COMPACTION_FINISH, 14, 1, null),
createKeyValue(MUTATION, 11, 1, ignored), createKeyValue(MUTATION, 15, 1, m), createKeyValue(MUTATION, 16, 1, m2),};
KeyValue entries4[] = new KeyValue[] {createKeyValue(OPEN, 17, -1, "4"), createKeyValue(DEFINE_TABLET, 18, 1, extent),
- createKeyValue(COMPACTION_START, 20, 1, "file"), createKeyValue(MUTATION, 19, 1, m3), createKeyValue(MUTATION, 21, 1, m4),};
+ createKeyValue(COMPACTION_START, 20, 1, "/t1/f3"), createKeyValue(MUTATION, 19, 1, m3), createKeyValue(MUTATION, 21, 1, m4),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("entries", entries);
logs.put("entries2", entries2);
@@ -236,7 +237,7 @@ public class SortedLogRecoveryTest {
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent), createKeyValue(MUTATION, 1, 1, ignored),
createKeyValue(MUTATION, 3, 1, m),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, 1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 3, 1, "somefile"), createKeyValue(MUTATION, 3, 1, m2),};
+ createKeyValue(COMPACTION_START, 2, 1, "/t1/f1"), createKeyValue(COMPACTION_FINISH, 3, 1, "/t1/f1"), createKeyValue(MUTATION, 3, 1, m2),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("entries", entries);
@@ -300,7 +301,7 @@ public class SortedLogRecoveryTest {
Mutation m = new ServerMutation(new Text("row1"));
m.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 2, 1, ignored),
+ createKeyValue(COMPACTION_START, 3, 1, "/t1/f1"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 2, 1, ignored),
createKeyValue(MUTATION, 5, 1, m),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("testlog", entries);
@@ -319,7 +320,7 @@ public class SortedLogRecoveryTest {
Mutation m = new ServerMutation(new Text("row1"));
m.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored),};
+ createKeyValue(COMPACTION_START, 3, 1, "/t1/f1"), createKeyValue(MUTATION, 2, 1, ignored),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 4, -1, "1"), createKeyValue(DEFINE_TABLET, 5, 1, extent),
createKeyValue(COMPACTION_FINISH, 6, 1, null), createKeyValue(MUTATION, 7, 1, m),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
@@ -342,7 +343,7 @@ public class SortedLogRecoveryTest {
Mutation m2 = new ServerMutation(new Text("row2"));
m2.put(cf, cq, new Value("123".getBytes()));
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
+ createKeyValue(COMPACTION_START, 3, 1, "/t1/f1"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
createKeyValue(COMPACTION_FINISH, 7, 1, null), createKeyValue(MUTATION, 8, 1, m2),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
@@ -366,7 +367,7 @@ public class SortedLogRecoveryTest {
Mutation m2 = new ServerMutation(new Text("row2"));
m2.put(cf, cq, new Value("123".getBytes()));
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_FINISH, 2, 1, null), createKeyValue(COMPACTION_START, 4, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 6, 1, null),
+ createKeyValue(COMPACTION_FINISH, 2, 1, null), createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"), createKeyValue(COMPACTION_FINISH, 6, 1, null),
createKeyValue(MUTATION, 3, 1, ignored), createKeyValue(MUTATION, 5, 1, m), createKeyValue(MUTATION, 7, 1, m2),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("entries", entries);
@@ -390,7 +391,7 @@ public class SortedLogRecoveryTest {
Mutation m3 = new ServerMutation(new Text("row3"));
m3.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 3, 1, "somefile"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
+ createKeyValue(COMPACTION_START, 3, 1, "/t1/f1"), createKeyValue(MUTATION, 2, 1, ignored), createKeyValue(MUTATION, 4, 1, m),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent), createKeyValue(MUTATION, 7, 1, m2),};
KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 8, -1, "1"), createKeyValue(DEFINE_TABLET, 9, 1, extent),
createKeyValue(COMPACTION_FINISH, 10, 1, null), createKeyValue(MUTATION, 11, 1, m3),};
@@ -415,7 +416,7 @@ public class SortedLogRecoveryTest {
Mutation m2 = new ServerMutation(new Text("row2"));
m2.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 30, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 32, 1, "somefile"), createKeyValue(MUTATION, 29, 1, m1),
+ createKeyValue(COMPACTION_START, 30, 1, "/t1/f1"), createKeyValue(COMPACTION_FINISH, 32, 1, "/t1/f1"), createKeyValue(MUTATION, 29, 1, m1),
createKeyValue(MUTATION, 30, 1, m2),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("testlog", entries);
@@ -437,9 +438,9 @@ public class SortedLogRecoveryTest {
Mutation m3 = new ServerMutation(new Text("row3"));
m3.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 3, 1, m),};
+ createKeyValue(COMPACTION_START, 2, 1, "/t1/f1"), createKeyValue(COMPACTION_FINISH, 4, 1, null), createKeyValue(MUTATION, 3, 1, m),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "1"), createKeyValue(DEFINE_TABLET, 6, 1, extent),
- createKeyValue(COMPACTION_START, 8, 1, "somefile"), createKeyValue(MUTATION, 7, 1, m2), createKeyValue(MUTATION, 9, 1, m3),};
+ createKeyValue(COMPACTION_START, 8, 1, "/t1/f1"), createKeyValue(MUTATION, 7, 1, m2), createKeyValue(MUTATION, 9, 1, m3),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("entries", entries);
logs.put("entries2", entries2);
@@ -470,14 +471,14 @@ public class SortedLogRecoveryTest {
Mutation m6 = new ServerMutation(new Text("row6"));
m6.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 4, 1, "somefile"),
+ createKeyValue(COMPACTION_START, 4, 1, "/t1/f1"),
// createKeyValue(COMPACTION_FINISH, 5, 1, null),
createKeyValue(MUTATION, 2, 1, m), createKeyValue(MUTATION, 3, 1, m2),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 5, -1, "2"), createKeyValue(DEFINE_TABLET, 6, 1, extent), createKeyValue(MUTATION, 7, 1, m3),
createKeyValue(MUTATION, 8, 1, m4),};
KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 9, -1, "3"), createKeyValue(DEFINE_TABLET, 10, 1, extent),
// createKeyValue(COMPACTION_FINISH, 11, 1, null),
- createKeyValue(COMPACTION_START, 12, 1, "somefile"),
+ createKeyValue(COMPACTION_START, 12, 1, "/t1/f1"),
// createKeyValue(COMPACTION_FINISH, 14, 1, null),
// createKeyValue(COMPACTION_START, 15, 1, "somefile"),
// createKeyValue(COMPACTION_FINISH, 17, 1, null),
@@ -517,10 +518,10 @@ public class SortedLogRecoveryTest {
Mutation m5 = new ServerMutation(new Text("row5"));
m5.put(cf, cq, value);
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 2, 1, "somefile"), createKeyValue(COMPACTION_FINISH, 3, 1, null), createKeyValue(MUTATION, 1, 1, ignored),
+ createKeyValue(COMPACTION_START, 2, 1, "/t1/f1"), createKeyValue(COMPACTION_FINISH, 3, 1, null), createKeyValue(MUTATION, 1, 1, ignored),
createKeyValue(MUTATION, 3, 1, m), createKeyValue(MUTATION, 3, 1, m2), createKeyValue(MUTATION, 3, 1, m3),};
KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "2"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(COMPACTION_START, 2, 1, "somefile2"), createKeyValue(MUTATION, 3, 1, m4), createKeyValue(MUTATION, 3, 1, m5),};
+ createKeyValue(COMPACTION_START, 2, 1, "/t1/f12"), createKeyValue(MUTATION, 3, 1, m4), createKeyValue(MUTATION, 3, 1, m5),};
Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
logs.put("entries", entries);
logs.put("entries2", entries2);
@@ -547,7 +548,7 @@ public class SortedLogRecoveryTest {
m.put("foo", "bar", "v1");
KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 1, extent),
- createKeyValue(DEFINE_TABLET, 1, 2, extent), createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "somefile"),
+ createKeyValue(DEFINE_TABLET, 1, 2, extent), createKeyValue(MUTATION, 2, 2, ignored), createKeyValue(COMPACTION_START, 3, 2, "/t1/f1"),
createKeyValue(MUTATION, 4, 2, m), createKeyValue(COMPACTION_FINISH, 6, 2, null),};
Arrays.sort(entries);
@@ -601,4 +602,59 @@ public class SortedLogRecoveryTest {
Assert.assertEquals(1, mutations.size());
Assert.assertEquals(m, mutations.get(0));
}
+
+ private void runPathTest(boolean startMatches, String compactionStartFile, String... tabletFiles) throws IOException {
+ Mutation m1 = new ServerMutation(new Text("row1"));
+ m1.put("foo", "bar", "v1");
+ Mutation m2 = new ServerMutation(new Text("row1"));
+ m2.put("foo", "bar", "v2");
+
+ KeyValue entries[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"), createKeyValue(DEFINE_TABLET, 1, 2, extent),
+ createKeyValue(MUTATION, 2, 2, m1), createKeyValue(COMPACTION_START, 3, 2, compactionStartFile), createKeyValue(MUTATION, 4, 2, m2),};
+
+ Arrays.sort(entries);
+ Map<String,KeyValue[]> logs = new TreeMap<String,KeyValue[]>();
+ logs.put("entries", entries);
+
+ HashSet<String> filesSet = new HashSet<String>();
+ filesSet.addAll(Arrays.asList(tabletFiles));
+ List<Mutation> mutations = recover(logs, filesSet, extent);
+
+ if (!startMatches) {
+ Assert.assertEquals(2, mutations.size());
+ Assert.assertEquals(m1, mutations.get(0));
+ Assert.assertEquals(m2, mutations.get(1));
+ } else {
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertEquals(m2, mutations.get(0));
+ }
+ }
+
+ @Test
+ public void testPaths() throws IOException {
+ // test having different paths for the same file. This can happen as a result of upgrade or user changing configuration
+ runPathTest(false, "/t1/f1", "/t1/f0");
+ runPathTest(true, "/t1/f1", "/t1/f0", "/t1/f1");
+
+ String aliases[] = new String[] {"/t1/f1", "hdfs://nn1/accumulo/tables/8/t1/f1", "hdfs://1.2.3.4/accumulo/tables/8/t1/f1",
+ "hdfs://1.2.3.4//accumulo//tables//8//t1//f1"};
+ String others[] = new String[] {"/t1/f0", "hdfs://nn1/accumulo/tables/8/t1/f2", "hdfs://1.2.3.4//accumulo//tables//8//t1//f3",
+ "hdfs://nn1/accumulo/tables/8/t1/t1", "hdfs://nn1/accumulo/tables/8/f1/f1"};
+
+ for (String alias1 : aliases) {
+ for (String alias2 : aliases) {
+ runPathTest(true, alias1, alias2);
+ for (String other : others) {
+ runPathTest(true, alias1, other, alias2);
+ runPathTest(true, alias1, alias2, other);
+ }
+ }
+ }
+
+ for (String alias1 : aliases) {
+ for (String other : others) {
+ runPathTest(false, alias1, other);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/71b2baec/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index f5b1ddb..2201ad2 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
@@ -34,16 +35,22 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.DiskUsage;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -66,9 +73,16 @@ public class VolumeIT extends ConfigurableMacIT {
v2 = new Path("file://" + v2f.getAbsolutePath());
}
+ @After
+ public void clearDirs() throws IOException {
+ FileUtils.deleteDirectory(new File(v1.toUri()));
+ FileUtils.deleteDirectory(new File(v2.toUri()));
+ }
+
@Override
public void configure(MiniAccumuloConfigImpl cfg) {
// Run MAC on two locations in the local file system
+ cfg.setProperty(Property.INSTANCE_DFS_URI, v1.toString());
cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
super.configure(cfg);
}
@@ -120,4 +134,103 @@ public class VolumeIT extends ConfigurableMacIT {
assertTrue(usage > 700 && usage < 800);
}
+ private void verifyData(List<String> expected, Scanner createScanner) {
+
+ List<String> actual = new ArrayList<String>();
+
+ for (Entry<Key,Value> entry : createScanner) {
+ Key k = entry.getKey();
+ actual.add(k.getRow() + ":" + k.getColumnFamily() + ":" + k.getColumnQualifier() + ":" + entry.getValue());
+ }
+
+ Collections.sort(expected);
+ Collections.sort(actual);
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testRelativePaths() throws Exception {
+
+ List<String> expected = new ArrayList<String>();
+
+ Connector connector = getConnector();
+ String tableName = getTableNames(1)[0];
+ connector.tableOperations().create(tableName, false);
+
+ String tableId = connector.tableOperations().tableIdMap().get(tableName);
+
+ SortedSet<Text> partitions = new TreeSet<Text>();
+ // with some splits
+ for (String s : "c,g,k,p,s,v".split(","))
+ partitions.add(new Text(s));
+
+ connector.tableOperations().addSplits(tableName, partitions);
+
+ BatchWriter bw = connector.createBatchWriter(tableName, new BatchWriterConfig());
+
+ // create two files in each tablet
+
+ String[] rows = "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",");
+ for (String s : rows) {
+ Mutation m = new Mutation(s);
+ m.put("cf1", "cq1", "1");
+ bw.addMutation(m);
+ expected.add(s + ":cf1:cq1:1");
+ }
+
+ bw.flush();
+ connector.tableOperations().flush(tableName, null, null, true);
+
+ for (String s : rows) {
+ Mutation m = new Mutation(s);
+ m.put("cf1", "cq1", "2");
+ bw.addMutation(m);
+ expected.add(s + ":cf1:cq1:2");
+ }
+
+ bw.close();
+ connector.tableOperations().flush(tableName, null, null, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ connector.tableOperations().offline(tableName, true);
+
+ connector.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+ Scanner metaScanner = connector.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ metaScanner.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+ metaScanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+
+ BatchWriter mbw = connector.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ if (cq.startsWith(v1.toString())) {
+ Path path = new Path(cq);
+ String relPath = "/" + path.getParent().getName() + "/" + path.getName();
+ Mutation fileMut = new Mutation(entry.getKey().getRow());
+ fileMut.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier());
+ fileMut.put(entry.getKey().getColumnFamily().toString(), relPath, entry.getValue().toString());
+ mbw.addMutation(fileMut);
+ }
+ }
+
+ mbw.close();
+
+ connector.tableOperations().online(tableName, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ connector.tableOperations().compact(tableName, null, null, true, true);
+
+ verifyData(expected, connector.createScanner(tableName, Authorizations.EMPTY));
+
+ for (Entry<Key,Value> entry : metaScanner) {
+ String cq = entry.getKey().getColumnQualifier().toString();
+ Path path = new Path(cq);
+ Assert.assertTrue("relative path not deleted " + path.toString(), path.depth() > 2);
+ }
+
+ }
}