You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/10/21 08:57:42 UTC
[accumulo] branch main updated: VolumeManager and Volume internals
cleanup (#1738)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 62110a3 VolumeManager and Volume internals cleanup (#1738)
62110a3 is described below
commit 62110a3c27b9a27d8a7c0bc272aa6778910acad7
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Wed Oct 21 04:57:34 2020 -0400
VolumeManager and Volume internals cleanup (#1738)
* VolumeManager
* remove getDefaultVolume, which was only used to create a temporary
directory for creating tables with split points, and to resolve
relative paths
* replace "default volume" concept with the first available volume
(this is what it was effectively doing anyway... but now it's
implemented more cleanly)
* simplified much of the volume management code in the CreateTable
FaTE operation classes, where creation of temporary files for
storing splits on a pre-split table was being handled
* rename getFileSystem methods throughout that return VolumeManager to
getVolumeManager, and stop passing it as an extra parameter if it is
already available via a ServerContext object
* Volume
* Remove unneeded overloaded Volume.prefixChild(Path)
* Rename Volume.isValidPath(Path) to Volume.containsPath(Path) to
improve readability
* Update javadoc for containsPath to describe what it means for a Path
to be contained
* Update VolumeImplTest to verify behavior of containsPath
* VolumeImpl
* Make protected fields private
* Rename conf to hadoopConf to clarify which config it is
* normalize base path by stripping out trailing slashes
* rename helper method equivalentPaths(Path) to isAncestorPathOf(Path)
for readability in containsPath(Path) implementation
* add more strict argument checking for prefixChild(String)
* fix bug in isAncestorPathOf(Path) that incorrectly concluded /a/path
is an ancestor of /a/pa (/a is an ancestor, but /a/pa is not)
* Improve exception message for prefixChild to include volume name
* VolumeImplTest
* check normalization by testing with trailing slashes
* add more test cases for isAncestorPathOf, including checking for
"breakout" path terms, like ".."
* add test cases for prefixChild and containsPath
* Tablet
* Fix bug in Tablet that wasn't using the Path's FileSystem when
deleting a file, but instead was using an arbitrary FileSystem
* Misc
* Update instance.volumes property description
* Ensure split files get written and read using UTF-8
---
.../accumulo/core/clientImpl/bulk/BulkImport.java | 8 +-
.../org/apache/accumulo/core/conf/Property.java | 4 +-
.../org/apache/accumulo/core/volume/Volume.java | 29 ++++---
.../apache/accumulo/core/volume/VolumeImpl.java | 96 ++++++++++++----------
.../accumulo/core/volume/VolumeImplTest.java | 75 ++++++++++++++++-
.../org/apache/accumulo/server/ServerUtil.java | 8 +-
.../apache/accumulo/server/fs/VolumeManager.java | 8 +-
.../accumulo/server/fs/VolumeManagerImpl.java | 29 ++-----
.../org/apache/accumulo/server/fs/VolumeUtil.java | 5 +-
.../apache/accumulo/server/gc/GcVolumeUtil.java | 15 +---
.../apache/accumulo/gc/SimpleGarbageCollector.java | 6 +-
.../accumulo/gc/SimpleGarbageCollectorTest.java | 4 +-
.../apache/accumulo/master/FateServiceHandler.java | 87 +++++++-------------
.../java/org/apache/accumulo/master/Master.java | 10 ---
.../apache/accumulo/master/tableOps/TableInfo.java | 19 +++--
.../org/apache/accumulo/master/tableOps/Utils.java | 31 +++----
.../accumulo/master/tableOps/create/ChooseDir.java | 24 +++---
.../master/tableOps/create/CreateTable.java | 7 +-
.../master/tableOps/create/FinishCreateTable.java | 10 +--
.../master/tableOps/create/PopulateMetadata.java | 17 ++--
.../org/apache/accumulo/tserver/FileManager.java | 9 +-
.../org/apache/accumulo/tserver/TabletServer.java | 10 +--
.../tserver/TabletServerResourceManager.java | 2 +-
.../org/apache/accumulo/tserver/log/DfsLogger.java | 4 +-
.../accumulo/tserver/log/TabletServerLogger.java | 2 +-
.../accumulo/tserver/tablet/CompactableUtils.java | 6 +-
.../apache/accumulo/tserver/tablet/Compactor.java | 4 +-
.../accumulo/tserver/tablet/DatafileManager.java | 23 +++---
.../accumulo/tserver/tablet/MinorCompactor.java | 4 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 18 ++--
.../tserver/TabletServerSyncCheckTest.java | 3 +-
.../accumulo/tserver/WalRemovalOrderTest.java | 2 +-
.../accumulo/shell/commands/SetIterCommand.java | 3 +-
33 files changed, 296 insertions(+), 286 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index deea676..1a60318 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -192,13 +192,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
* Check path of bulk directory and permissions
*/
private Path checkPath(FileSystem fs, String dir) throws IOException, AccumuloException {
- Path ret;
-
- if (dir.contains(":")) {
- ret = new Path(dir);
- } else {
- ret = fs.makeQualified(new Path(dir));
- }
+ Path ret = dir.contains(":") ? new Path(dir) : fs.makeQualified(new Path(dir));
try {
if (!fs.getFileStatus(ret).isDirectory()) {
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 345efac..9a1d05c 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -106,7 +106,9 @@ public enum Property {
+ " org.apache.accumulo.server.util.ChangeSecret"),
INSTANCE_VOLUMES("instance.volumes", "", PropertyType.STRING,
"A comma separated list of dfs uris to use. Files will be stored across"
- + " these filesystems. If this is empty, then instance.dfs.uri will be used."
+ + " these filesystems. In some situations, the first volume in this list"
+ + " may be treated differently, such as being preferred for writing out"
+ + " temporary files (for example, when creating a pre-split table)."
+ " After adding uris to this list, run 'accumulo init --add-volume' and then"
+ " restart tservers. If entries are removed from this list then tservers"
+ " will need to be restarted. After a uri is removed from the list Accumulo"
diff --git a/core/src/main/java/org/apache/accumulo/core/volume/Volume.java b/core/src/main/java/org/apache/accumulo/core/volume/Volume.java
index baf477b..75c4df5 100644
--- a/core/src/main/java/org/apache/accumulo/core/volume/Volume.java
+++ b/core/src/main/java/org/apache/accumulo/core/volume/Volume.java
@@ -38,26 +38,25 @@ public interface Volume {
String getBasePath();
/**
- * Convert the given Path into a Path that is relative to the base path for this Volume
+ * Convert the given child path into a Path that is relative to the base path for this Volume. The
+ * supplied path should not include any scheme (such as <code>file:</code> or <code>hdfs:</code>),
+ * and should not contain any relative path "breakout" patterns, such as <code>../</code>. If the
+ * path begins with a single slash, it will be preserved while prefixing this volume. If it does
+ * not begin with a single slash, one will be inserted.
*
- * @param p
+ * @param pathString
* The suffix to use
* @return A Path for this Volume with the provided suffix
*/
- Path prefixChild(Path p);
+ Path prefixChild(String pathString);
/**
- * Convert the given child path into a Path that is relative to the base path for this Volume
- *
- * @param p
- * The suffix to use
- * @return A Path for this Volume with the provided suffix
- */
- Path prefixChild(String p);
-
- /**
- * Determine if the Path is valid on this Volume. A Path is valid if it is contained in the
- * Volume's FileSystem and is rooted beneath the basePath
+ * Determine if the Path is contained in Volume. A Path is considered contained if refers to a
+ * location within the base path for this Volume on the same FileSystem. It can be located at the
+ * base path, or within any sub-directory. Unqualified paths (those without a file system scheme)
+ * will resolve to using the configured Hadoop default file system before comparison. Paths are
+ * not considered "contained" within this Volume if they have any relative path "breakout"
+ * patterns, such as <code>../</code>.
*/
- boolean isValidPath(Path p);
+ boolean containsPath(Path path);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java b/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java
index cafa87d..0c8dd87 100644
--- a/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/volume/VolumeImpl.java
@@ -34,24 +34,30 @@ import org.slf4j.LoggerFactory;
* that filesystem.
*/
public class VolumeImpl implements Volume {
+
private static final Logger log = LoggerFactory.getLogger(VolumeImpl.class);
- protected final FileSystem fs;
- protected final String basePath;
+ private final FileSystem fs;
+ private final String basePath;
private final Configuration hadoopConf;
- public VolumeImpl(Path path, Configuration conf) throws IOException {
- this.fs = requireNonNull(path).getFileSystem(requireNonNull(conf));
- this.basePath = path.toUri().getPath();
- this.hadoopConf = conf;
+ public VolumeImpl(Path path, Configuration hadoopConf) throws IOException {
+ this.fs = requireNonNull(path).getFileSystem(requireNonNull(hadoopConf));
+ this.basePath = stripTrailingSlashes(path.toUri().getPath());
+ this.hadoopConf = hadoopConf;
}
public VolumeImpl(FileSystem fs, String basePath) {
this.fs = requireNonNull(fs);
- this.basePath = requireNonNull(basePath);
+ this.basePath = stripTrailingSlashes(requireNonNull(basePath));
this.hadoopConf = fs.getConf();
}
+ // remove any trailing whitespace or slashes
+ private static String stripTrailingSlashes(String path) {
+ return path.strip().replaceAll("/*$", "");
+ }
+
@Override
public FileSystem getFileSystem() {
return fs;
@@ -63,49 +69,33 @@ public class VolumeImpl implements Volume {
}
@Override
- public Path prefixChild(Path p) {
- return fs.makeQualified(new Path(basePath, p));
- }
-
- @Override
- public boolean isValidPath(Path p) {
- requireNonNull(p);
-
- FileSystem other;
+ public boolean containsPath(Path path) {
+ FileSystem otherFS;
try {
- other = p.getFileSystem(hadoopConf);
+ otherFS = requireNonNull(path).getFileSystem(hadoopConf);
} catch (IOException e) {
- log.warn("Could not determine filesystem from path: {}", p, e);
+ log.warn("Could not determine filesystem from path: {}", path, e);
return false;
}
-
- if (equivalentFileSystems(other)) {
- return equivalentPaths(p);
- }
-
- return false;
+ return equivalentFileSystems(otherFS) && isAncestorPathOf(path);
}
- /**
- * Test whether the provided {@link FileSystem} object reference the same actual filesystem as the
- * member <code>fs</code>.
- *
- * @param other
- * The filesystem to compare
- */
- boolean equivalentFileSystems(FileSystem other) {
- return fs.getUri().equals(other.getUri());
+ // same if the only difference is trailing slashes
+ boolean equivalentFileSystems(FileSystem otherFS) {
+ return stripTrailingSlashes(fs.getUri().toString())
+ .equals(stripTrailingSlashes(otherFS.getUri().toString()));
}
- /**
- * Tests if the provided {@link Path} is rooted inside this volume, contained within
- * <code>basePath</code>.
- *
- * @param other
- * The path to compare
- */
- boolean equivalentPaths(Path other) {
- return other.toUri().getPath().startsWith(basePath);
+ // is ancestor if the path portion without the filesystem scheme
+ // is a subdirectory of this volume's basePath
+ boolean isAncestorPathOf(Path other) {
+ String otherPath = other.toUri().getPath().strip();
+ if (otherPath.startsWith(basePath)) {
+ String otherRemainingPath = otherPath.substring(basePath.length());
+ return otherRemainingPath.isEmpty()
+ || (otherRemainingPath.startsWith("/") && !otherRemainingPath.contains(".."));
+ }
+ return false;
}
@Override
@@ -126,12 +116,28 @@ public class VolumeImpl implements Volume {
@Override
public String toString() {
- return getFileSystem() + " " + basePath;
+ return fs.makeQualified(new Path(basePath)).toString();
}
@Override
- public Path prefixChild(String p) {
- return prefixChild(new Path(basePath, p));
+ public Path prefixChild(String pathString) {
+ String p = requireNonNull(pathString).strip();
+ p = p.startsWith("/") ? p.substring(1) : p;
+ String reason;
+ if (p.isBlank()) {
+ return fs.makeQualified(new Path(basePath));
+ } else if (p.startsWith("/")) {
+ // check for starting with '//'
+ reason = "absolute path";
+ } else if (pathString.contains(":")) {
+ reason = "qualified path";
+ } else if (pathString.contains("..")) {
+ reason = "path contains '..'";
+ } else {
+ return fs.makeQualified(new Path(basePath, p));
+ }
+ throw new IllegalArgumentException(
+ String.format("Cannot prefix %s (%s) with volume %s", pathString, reason, this));
}
}
diff --git a/core/src/test/java/org/apache/accumulo/core/volume/VolumeImplTest.java b/core/src/test/java/org/apache/accumulo/core/volume/VolumeImplTest.java
index 29e30d9..a46766c 100644
--- a/core/src/test/java/org/apache/accumulo/core/volume/VolumeImplTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/volume/VolumeImplTest.java
@@ -22,15 +22,20 @@ import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.net.URI;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
public class VolumeImplTest {
@@ -61,7 +66,7 @@ public class VolumeImplTest {
String basePath = "/accumulo";
expect(fs.getConf()).andReturn(hadoopConf).anyTimes();
- expect(fs.getUri()).andReturn(URI.create("hdfs://myhost:8020")).anyTimes();
+ expect(fs.getUri()).andReturn(URI.create("hdfs://myhost:8020/")).anyTimes();
expect(other.getUri()).andReturn(URI.create("hdfs://myhost:8020")).anyTimes();
replay(fs, other);
@@ -79,7 +84,9 @@ public class VolumeImplTest {
VolumeImpl volume = new VolumeImpl(fs, "/accumulo");
- assertFalse(volume.equivalentPaths(new Path("/something/accumulo")));
+ assertFalse(volume.isAncestorPathOf(new Path("/something/accumulo")));
+ assertFalse(volume.isAncestorPathOf(new Path("/accumulo2")));
+ assertFalse(volume.isAncestorPathOf(new Path("/accumulo/..")));
}
@Test
@@ -90,9 +97,69 @@ public class VolumeImplTest {
VolumeImpl volume = new VolumeImpl(fs, basePath);
// Bare path should match
- assertTrue(volume.equivalentPaths(new Path(basePath)));
+ assertTrue(volume.isAncestorPathOf(new Path(basePath)));
// Prefix should also match
- assertTrue(volume.equivalentPaths(new Path(basePath + "/tables/1/F000001.rf")));
+ assertTrue(volume.isAncestorPathOf(new Path(basePath + "/tables/1/F000001.rf")));
+ }
+
+ @Test
+ public void testPrefixChild() throws IOException {
+ FileSystem fs = new Path("file:///").getFileSystem(new Configuration(false));
+ var volume = new VolumeImpl(fs, "/tmp/accumulo/");
+ assertEquals("file:/tmp/accumulo", volume.toString());
+ // test normalization for effectively empty child
+ Set.of(" ", " ", " ", " .", " ./", " .// ", " ././/./ ").forEach(s -> {
+ assertEquals("file:/tmp/accumulo", volume.prefixChild(s).toString());
+ });
+ // test normalization for single depth child
+ Set.of("/abc", "abc", " abc/ ", " abc/// ", "./abc/.", "./abc").forEach(s -> {
+ assertEquals("file:/tmp/accumulo/abc", volume.prefixChild(s).toString());
+ });
+ // test normalization for multi depth child
+ Set.of("abc/./def/", " abc/def/ ", " abc////def/ ", " ./abc/.//def/. ").forEach(s -> {
+ assertEquals("file:/tmp/accumulo/abc/def", volume.prefixChild(s).toString());
+ });
+ // test failures for absolute paths
+ Set.of("//abc", " //abc ", "///abc").forEach(s -> {
+ var e = assertThrows(IllegalArgumentException.class, () -> {
+ volume.prefixChild(s);
+ LoggerFactory.getLogger(VolumeImplTest.class).error("Should have thrown on " + s);
+ });
+ assertEquals("Cannot prefix " + s + " (absolute path) with volume file:/tmp/accumulo",
+ e.getMessage());
+ });
+ // test failures for qualified paths
+ Set.of("file:/abc", "hdfs://host:1234", " file:/def ").forEach(s -> {
+ var e = assertThrows(IllegalArgumentException.class, () -> {
+ volume.prefixChild(s);
+ LoggerFactory.getLogger(VolumeImplTest.class).error("Should have thrown on " + s);
+ });
+ assertEquals("Cannot prefix " + s + " (qualified path) with volume file:/tmp/accumulo",
+ e.getMessage());
+ });
+ // test failures for breakout paths
+ Set.of("./abc/..", "abc/../def/", "../abc", " .. ").forEach(s -> {
+ var e = assertThrows(IllegalArgumentException.class, () -> {
+ volume.prefixChild(s);
+ LoggerFactory.getLogger(VolumeImplTest.class).error("Should have thrown on " + s);
+ });
+ assertEquals("Cannot prefix " + s + " (path contains '..') with volume file:/tmp/accumulo",
+ e.getMessage());
+ });
+ // quick check to verify with hdfs
+ FileSystem fs2 = new Path("hdfs://127.0.0.1:1234/").getFileSystem(new Configuration(false));
+ var volume2 = new VolumeImpl(fs2, "/tmp/accumulo/");
+ assertEquals("hdfs://127.0.0.1:1234/tmp/accumulo", volume2.toString());
+ assertEquals("hdfs://127.0.0.1:1234/tmp/accumulo/abc", volume2.prefixChild("abc").toString());
+ }
+
+ @Test
+ public void testContains() throws IOException {
+ FileSystem fs = new Path("file:///").getFileSystem(new Configuration(false));
+ var volume = new VolumeImpl(fs, "/tmp/accumulo/");
+ Set.of("abc", " abc/def/ ", " ghi/// ").forEach(s -> {
+ assertTrue(volume.containsPath(volume.prefixChild(s)));
+ });
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
index 71be74b..e55dde5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
@@ -95,14 +95,14 @@ public class ServerUtil {
return getAccumuloPersistentVersion(v.getFileSystem(), path);
}
- public static synchronized int getAccumuloPersistentVersion(VolumeManager fs) {
+ public static synchronized int getAccumuloPersistentVersion(VolumeManager vm) {
// It doesn't matter which Volume is used as they should all have the data version stored
- return getAccumuloPersistentVersion(fs.getVolumes().iterator().next());
+ return getAccumuloPersistentVersion(vm.getFirst());
}
- public static synchronized Path getAccumuloInstanceIdPath(VolumeManager fs) {
+ public static synchronized Path getAccumuloInstanceIdPath(VolumeManager vm) {
// It doesn't matter which Volume is used as they should all have the instance ID stored
- return ServerConstants.getInstanceIdLocation(fs.getVolumes().iterator().next());
+ return ServerConstants.getInstanceIdLocation(vm.getFirst());
}
public static void init(ServerContext context, String application) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
index ed2d4a1..2a53e18 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -179,12 +179,14 @@ public interface VolumeManager extends AutoCloseable {
boolean canSyncAndFlush(Path path);
/**
- * Fetch the default Volume
+ * Fetch the first configured instance Volume
*/
- Volume getDefaultVolume();
+ default Volume getFirst() {
+ return getVolumes().iterator().next();
+ }
/**
- * Fetch the configured Volumes, excluding the default Volume
+ * Fetch the configured instance Volumes
*/
Collection<Volume> getVolumes();
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index 1fed821..0be0bca 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -72,14 +72,12 @@ public class VolumeManagerImpl implements VolumeManager {
private final Map<String,Volume> volumesByName;
private final Multimap<URI,Volume> volumesByFileSystemUri;
- private final Volume defaultVolume;
private final VolumeChooser chooser;
private final Configuration hadoopConf;
- protected VolumeManagerImpl(Map<String,Volume> volumes, Volume defaultVolume,
- AccumuloConfiguration conf, Configuration hadoopConf) {
+ protected VolumeManagerImpl(Map<String,Volume> volumes, AccumuloConfiguration conf,
+ Configuration hadoopConf) {
this.volumesByName = volumes;
- this.defaultVolume = defaultVolume;
// We may have multiple directories used in a single FileSystem (e.g. testing)
this.volumesByFileSystemUri = invertVolumesByFileSystem(volumesByName);
ensureSyncIsEnabled();
@@ -110,9 +108,10 @@ public class VolumeManagerImpl implements VolumeManager {
public static VolumeManager getLocalForTesting(String localBasePath) throws IOException {
AccumuloConfiguration accConf = DefaultConfiguration.getInstance();
Configuration hadoopConf = new Configuration();
- Volume defaultLocalVolume = new VolumeImpl(FileSystem.getLocal(hadoopConf), localBasePath);
- return new VolumeManagerImpl(Collections.singletonMap("", defaultLocalVolume),
- defaultLocalVolume, accConf, hadoopConf);
+ FileSystem localFS = FileSystem.getLocal(hadoopConf);
+ Volume defaultLocalVolume = new VolumeImpl(localFS, localBasePath);
+ return new VolumeManagerImpl(Collections.singletonMap("", defaultLocalVolume), accConf,
+ hadoopConf);
}
@Override
@@ -244,19 +243,16 @@ public class VolumeManagerImpl implements VolumeManager {
@Override
public FileSystem getFileSystemByPath(Path path) {
- if (!requireNonNull(path).toString().contains(":")) {
- return defaultVolume.getFileSystem();
- }
FileSystem desiredFs;
try {
- desiredFs = path.getFileSystem(hadoopConf);
+ desiredFs = requireNonNull(path).getFileSystem(hadoopConf);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
URI desiredFsUri = desiredFs.getUri();
Collection<Volume> candidateVolumes = volumesByFileSystemUri.get(desiredFsUri);
if (candidateVolumes != null) {
- return candidateVolumes.stream().filter(volume -> volume.isValidPath(path))
+ return candidateVolumes.stream().filter(volume -> volume.containsPath(path))
.map(Volume::getFileSystem).findFirst().orElse(desiredFs);
} else {
log.debug("Could not determine volume for Path: {}", path);
@@ -369,9 +365,7 @@ public class VolumeManagerImpl implements VolumeManager {
}
}
- String uri = volumeStrings.iterator().next();
- Volume defaultVolume = new VolumeImpl(new Path(uri), hadoopConf);
- return new VolumeManagerImpl(volumes, defaultVolume, conf, hadoopConf);
+ return new VolumeManagerImpl(volumes, conf, hadoopConf);
}
@Override
@@ -452,11 +446,6 @@ public class VolumeManagerImpl implements VolumeManager {
}
@Override
- public Volume getDefaultVolume() {
- return defaultVolume;
- }
-
- @Override
public Collection<Volume> getVolumes() {
return volumesByName.values();
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 7e315ac..736bc2a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -57,8 +57,9 @@ public class VolumeUtil {
}
public static Path removeTrailingSlash(Path path) {
- if (path.toString().endsWith("/"))
- return new Path(removeTrailingSlash(path.toString()));
+ String pathStr = path.toString();
+ if (pathStr.endsWith("/"))
+ return new Path(removeTrailingSlash(pathStr));
return path;
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/gc/GcVolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/gc/GcVolumeUtil.java
index a4461cc..65c33b5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/gc/GcVolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/gc/GcVolumeUtil.java
@@ -18,13 +18,12 @@
*/
package org.apache.accumulo.server.gc;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
-import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.Path;
@@ -42,16 +41,8 @@ public class GcVolumeUtil {
public static Collection<Path> expandAllVolumesUri(VolumeManager fs, Path path) {
if (path.toString().startsWith(ALL_VOLUMES_PREFIX)) {
String relPath = path.toString().substring(ALL_VOLUMES_PREFIX.length());
-
- Collection<Volume> volumes = fs.getVolumes();
-
- ArrayList<Path> ret = new ArrayList<>(volumes.size());
- for (Volume vol : volumes) {
- Path volPath = vol.prefixChild(relPath);
- ret.add(volPath);
- }
-
- return ret;
+ return fs.getVolumes().stream().map(vol -> vol.prefixChild(relPath))
+ .collect(Collectors.toList());
} else {
return Collections.singleton(path);
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index f273d9f..1ca98f0 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -26,7 +26,6 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -699,7 +698,6 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
static void minimizeDeletes(SortedMap<String,String> confirmedDeletes,
List<String> processedDeletes, VolumeManager fs) {
Set<Path> seenVolumes = new HashSet<>();
- Collection<Volume> volumes = fs.getVolumes();
// when deleting a dir and all files in that dir, only need to delete the dir
// the dir will sort right before the files... so remove the files in this case
@@ -726,8 +724,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
if (seenVolumes.contains(vol)) {
sameVol = true;
} else {
- for (Volume cvol : volumes) {
- if (cvol.isValidPath(vol)) {
+ for (Volume cvol : fs.getVolumes()) {
+ if (cvol.containsPath(vol)) {
seenVolumes.add(vol);
sameVol = true;
}
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
index f2f6214..2d571e5 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/SimpleGarbageCollectorTest.java
@@ -145,12 +145,12 @@ public class SimpleGarbageCollectorTest {
@Test
public void testMinimizeDeletes() {
Volume vol1 = createMock(Volume.class);
- expect(vol1.isValidPath(anyObject()))
+ expect(vol1.containsPath(anyObject()))
.andAnswer(() -> getCurrentArguments()[0].toString().startsWith("hdfs://nn1/accumulo"))
.anyTimes();
Volume vol2 = createMock(Volume.class);
- expect(vol2.isValidPath(anyObject()))
+ expect(vol2.containsPath(anyObject()))
.andAnswer(() -> getCurrentArguments()[0].toString().startsWith("hdfs://nn2/accumulo"))
.anyTimes();
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
index 327e752..7f0bdac 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.master;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.Constants.MAX_NAMESPACE_LEN;
import static org.apache.accumulo.core.Constants.MAX_TABLE_NAME_LEN;
import static org.apache.accumulo.master.util.TableValidators.CAN_CLONE;
@@ -169,12 +170,14 @@ class FateServiceHandler implements FateService.Iface {
InitialTableState.valueOf(ByteBufferUtil.toString(arguments.get(2)));
int splitCount = Integer.parseInt(ByteBufferUtil.toString(arguments.get(3)));
validateArgumentCount(arguments, tableOp, SPLIT_OFFSET + splitCount);
- String splitFile = null;
- String splitDirsFile = null;
+ Path splitsPath = null;
+ Path splitsDirsPath = null;
if (splitCount > 0) {
try {
- splitFile = writeSplitsToFile(opid, arguments, splitCount, SPLIT_OFFSET);
- splitDirsFile = createSplitDirsFile(opid);
+ Path tmpDir = mkTempDir(opid);
+ splitsPath = new Path(tmpDir, "splits");
+ splitsDirsPath = new Path(tmpDir, "splitsDirs");
+ writeSplitsToFile(splitsPath, arguments, splitCount, SPLIT_OFFSET);
} catch (IOException e) {
throw new ThriftTableOperationException(null, tableName, tableOp,
TableOperationExceptionType.OTHER,
@@ -196,7 +199,7 @@ class FateServiceHandler implements FateService.Iface {
master.fate.seedTransaction(opid,
new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options,
- splitFile, splitCount, splitDirsFile, initialTableState, namespaceId)),
+ splitsPath, splitCount, splitsDirsPath, initialTableState, namespaceId)),
autoCleanup);
break;
@@ -779,65 +782,37 @@ class FateServiceHandler implements FateService.Iface {
/**
* Create a file on the file system to hold the splits to be created at table creation.
*/
- private String writeSplitsToFile(final long opid, final List<ByteBuffer> arguments,
+ private void writeSplitsToFile(Path splitsPath, final List<ByteBuffer> arguments,
final int splitCount, final int splitOffset) throws IOException {
- String opidStr = String.format("%016x", opid);
- String splitsPath = getSplitPath("/tmp/splits-" + opidStr);
- removeAndCreateTempFile(splitsPath);
- try (FSDataOutputStream stream = master.getOutputStream(splitsPath)) {
- writeSplitsToFileSystem(stream, arguments, splitCount, splitOffset);
+ FileSystem fs = splitsPath.getFileSystem(master.getContext().getHadoopConf());
+ try (FSDataOutputStream stream = fs.create(splitsPath)) {
+ // base64 encode because splits can contain binary
+ for (int i = splitOffset; i < splitCount + splitOffset; i++) {
+ byte[] splitBytes = ByteBufferUtil.toBytes(arguments.get(i));
+ String encodedSplit = Base64.getEncoder().encodeToString(splitBytes);
+ stream.write((encodedSplit + '\n').getBytes(UTF_8));
+ }
} catch (IOException e) {
- log.error("Error in FateServiceHandler while writing splits for opid: " + opidStr + ": "
- + e.getMessage());
+ log.error("Error in FateServiceHandler while writing splits to {}: {}", splitsPath,
+ e.getMessage());
throw e;
}
- return splitsPath;
- }
-
- /**
- * Always check for and delete the splits file if it exists to prevent issues in case of server
- * failure and/or FateServiceHandler retries.
- */
- private void removeAndCreateTempFile(String path) throws IOException {
- FileSystem fs = master.getVolumeManager().getDefaultVolume().getFileSystem();
- if (fs.exists(new Path(path)))
- fs.delete(new Path(path), true);
- fs.create(new Path(path));
- }
-
- /**
- * Check for and delete the temp file if it exists to prevent issues in case of server failure
- * and/or FateServiceHandler retries. Then create/recreate the file.
- */
- private String createSplitDirsFile(final long opid) throws IOException {
- String opidStr = String.format("%016x", opid);
- String splitDirPath = getSplitPath("/tmp/splitDirs-" + opidStr);
- removeAndCreateTempFile(splitDirPath);
- return splitDirPath;
}
/**
- * Write the split values to a tmp directory with unique name. Given that it is not known if the
- * supplied splits will be textual or binary, all splits will be encoded to enable proper handling
- * of binary data.
+ * Creates a temporary directory for the given FaTE operation (deleting any existing, to avoid
+ * issues in case of server retry).
+ *
+ * @return the path of the created directory
*/
- private void writeSplitsToFileSystem(final FSDataOutputStream stream,
- final List<ByteBuffer> arguments, final int splitCount, final int splitOffset)
- throws IOException {
- for (int i = splitOffset; i < splitCount + splitOffset; i++) {
- byte[] splitBytes = ByteBufferUtil.toBytes(arguments.get(i));
- String encodedSplit = Base64.getEncoder().encodeToString(splitBytes);
- stream.writeBytes(encodedSplit + '\n');
- }
+ public Path mkTempDir(long opid) throws IOException {
+ Volume vol = master.getVolumeManager().getFirst();
+ Path p = vol.prefixChild("/tmp/fate-" + String.format("%016x", opid));
+ FileSystem fs = vol.getFileSystem();
+ if (fs.exists(p))
+ fs.delete(p, true);
+ fs.mkdirs(p);
+ return p;
}
- /**
- * Get full path to location where initial splits are stored on file system.
- */
- private String getSplitPath(String relPath) {
- Volume defaultVolume = master.getVolumeManager().getDefaultVolume();
- String uri = defaultVolume.getFileSystem().getUri().toString();
- String basePath = defaultVolume.getBasePath();
- return uri + basePath + relPath;
- }
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/Master.java b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
index 54095c2..d73e5f3 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/Master.java
@@ -135,8 +135,6 @@ import org.apache.accumulo.server.util.TableInfoUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.start.classloader.vfs.ContextManager;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -1710,12 +1708,4 @@ public class Master extends AbstractServer
return masterInitialized.get();
}
- public FSDataOutputStream getOutputStream(final String path) throws IOException {
- return getVolumeManager().getDefaultVolume().getFileSystem().create(new Path(path));
- }
-
- public FSDataInputStream getInputStream(final String path) throws IOException {
- return getVolumeManager().getDefaultVolume().getFileSystem().open(new Path(path));
- }
-
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java
index 5a801eb..37d85b8 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/TableInfo.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.admin.InitialTableState;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.hadoop.fs.Path;
public class TableInfo implements Serializable {
@@ -87,20 +88,22 @@ public class TableInfo implements Serializable {
this.user = user;
}
- public String getSplitFile() {
- return splitFile;
+ public Path getSplitPath() {
+ return new Path(splitFile);
}
- public void setSplitFile(String splitFile) {
- this.splitFile = splitFile;
+ // stored as string for Java serialization
+ public void setSplitPath(Path splitPath) {
+ this.splitFile = splitPath == null ? null : splitPath.toString();
}
- public String getSplitDirsFile() {
- return splitDirsFile;
+ public Path getSplitDirsPath() {
+ return new Path(splitDirsFile);
}
- public void setSplitDirsFile(String splitDirsFile) {
- this.splitDirsFile = splitDirsFile;
+ // stored as string for Java serialization
+ public void setSplitDirsPath(Path splitDirsPath) {
+ this.splitDirsFile = splitDirsPath == null ? null : splitDirsPath.toString();
}
public InitialTableState getInitialTableState() {
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
index e2d0274..227f837 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
@@ -20,9 +20,7 @@ package org.apache.accumulo.master.tableOps;
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.math.BigInteger;
import java.util.Base64;
import java.util.SortedSet;
@@ -47,7 +45,8 @@ import org.apache.accumulo.fate.zookeeper.ZooReservation;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.ServerContext;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -195,22 +194,24 @@ public class Utils {
}
/**
- * Given an input stream and a flag indicating if the file info is base64 encoded or not, retrieve
- * the data from a file on the file system. It is assumed that the file is textual and not binary
- * data.
+ * Given a fully-qualified Path and a flag indicating if the file info is base64 encoded or not,
+ * retrieve the data from a file on the file system. It is assumed that the file is textual and
+ * not binary data.
+ *
+ * @param path
+ * the fully-qualified path
*/
- public static SortedSet<Text> getSortedSetFromFile(FSDataInputStream inputStream, boolean encoded)
+ public static SortedSet<Text> getSortedSetFromFile(Master master, Path path, boolean encoded)
throws IOException {
- SortedSet<Text> data = new TreeSet<>();
- try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
- String line;
- while ((line = br.readLine()) != null) {
- if (encoded)
- data.add(new Text(Base64.getDecoder().decode(line)));
- else
- data.add(new Text(line));
+ FileSystem fs = path.getFileSystem(master.getContext().getHadoopConf());
+ var data = new TreeSet<Text>();
+ try (var file = new java.util.Scanner(fs.open(path), UTF_8)) {
+ while (file.hasNextLine()) {
+ String line = file.nextLine();
+ data.add(encoded ? new Text(Base64.getDecoder().decode(line)) : new Text(line));
}
}
return data;
}
+
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/ChooseDir.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/ChooseDir.java
index 2533676..4e16f354 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/ChooseDir.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/ChooseDir.java
@@ -18,6 +18,8 @@
*/
package org.apache.accumulo.master.tableOps.create;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -28,7 +30,6 @@ import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.TableInfo;
import org.apache.accumulo.master.tableOps.Utils;
-import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -59,8 +60,9 @@ class ChooseDir extends MasterRepo {
@Override
public void undo(long tid, Master master) throws Exception {
- VolumeManager fs = master.getVolumeManager();
- fs.deleteRecursively(new Path(tableInfo.getSplitDirsFile()));
+ Path p = tableInfo.getSplitDirsPath();
+ FileSystem fs = p.getFileSystem(master.getContext().getHadoopConf());
+ fs.delete(p, true);
}
/**
@@ -68,8 +70,7 @@ class ChooseDir extends MasterRepo {
* to the file system for later use during this FATE operation.
*/
private void createTableDirectoriesInfo(Master master) throws IOException {
- SortedSet<Text> splits =
- Utils.getSortedSetFromFile(master.getInputStream(tableInfo.getSplitFile()), true);
+ SortedSet<Text> splits = Utils.getSortedSetFromFile(master, tableInfo.getSplitPath(), true);
SortedSet<Text> tabletDirectoryInfo = createTabletDirectoriesSet(master, splits.size());
writeTabletDirectoriesToFileSystem(master, tabletDirectoryInfo);
}
@@ -78,7 +79,7 @@ class ChooseDir extends MasterRepo {
* Create a set of unique table directories. These will be associated with splits in a follow-on
* FATE step.
*/
- private SortedSet<Text> createTabletDirectoriesSet(Master master, int num) {
+ private static SortedSet<Text> createTabletDirectoriesSet(Master master, int num) {
String tabletDir;
UniqueNameAllocator namer = master.getContext().getUniqueNameAllocator();
SortedSet<Text> splitDirs = new TreeSet<>();
@@ -95,12 +96,13 @@ class ChooseDir extends MasterRepo {
*/
private void writeTabletDirectoriesToFileSystem(Master master, SortedSet<Text> dirs)
throws IOException {
- FileSystem fs = master.getVolumeManager().getDefaultVolume().getFileSystem();
- if (fs.exists(new Path(tableInfo.getSplitDirsFile())))
- fs.delete(new Path(tableInfo.getSplitDirsFile()), true);
- try (FSDataOutputStream stream = master.getOutputStream(tableInfo.getSplitDirsFile())) {
+ Path p = tableInfo.getSplitDirsPath();
+ FileSystem fs = p.getFileSystem(master.getContext().getHadoopConf());
+ if (fs.exists(p))
+ fs.delete(p, true);
+ try (FSDataOutputStream stream = fs.create(p)) {
for (Text dir : dirs)
- stream.writeBytes(dir + "\n");
+ stream.write((dir + "\n").getBytes(UTF_8));
}
}
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/CreateTable.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/CreateTable.java
index 3a4f61b..0fbf439 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/CreateTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/CreateTable.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.TableInfo;
import org.apache.accumulo.master.tableOps.Utils;
+import org.apache.hadoop.fs.Path;
public class CreateTable extends MasterRepo {
private static final long serialVersionUID = 1L;
@@ -37,7 +38,7 @@ public class CreateTable extends MasterRepo {
private TableInfo tableInfo;
public CreateTable(String user, String tableName, TimeType timeType, Map<String,String> props,
- String splitFile, int splitCount, String splitDirsFile, InitialTableState initialTableState,
+ Path splitPath, int splitCount, Path splitDirsPath, InitialTableState initialTableState,
NamespaceId namespaceId) {
tableInfo = new TableInfo();
tableInfo.setTableName(tableName);
@@ -45,10 +46,10 @@ public class CreateTable extends MasterRepo {
tableInfo.setUser(user);
tableInfo.props = props;
tableInfo.setNamespaceId(namespaceId);
- tableInfo.setSplitFile(splitFile);
+ tableInfo.setSplitPath(splitPath);
tableInfo.setInitialSplitSize(splitCount);
tableInfo.setInitialTableState(initialTableState);
- tableInfo.setSplitDirsFile(splitDirsFile);
+ tableInfo.setSplitDirsPath(splitDirsPath);
}
@Override
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/FinishCreateTable.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/FinishCreateTable.java
index 34be770..aabb739 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/FinishCreateTable.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/FinishCreateTable.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.accumulo.core.client.admin.InitialTableState;
import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
@@ -69,10 +68,11 @@ class FinishCreateTable extends MasterRepo {
}
private void cleanupSplitFiles(Master env) throws IOException {
- Volume defaultVolume = env.getVolumeManager().getDefaultVolume();
- FileSystem fs = defaultVolume.getFileSystem();
- fs.delete(new Path(tableInfo.getSplitFile()), true);
- fs.delete(new Path(tableInfo.getSplitDirsFile()), true);
+ // it is sufficient to delete from the parent, because both files are in the same directory, and
+ // we want to delete the directory also
+ Path p = tableInfo.getSplitPath().getParent();
+ FileSystem fs = p.getFileSystem(env.getContext().getHadoopConf());
+ fs.delete(p, true);
}
@Override
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java
index 1eb2356..330645e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/create/PopulateMetadata.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -63,20 +64,18 @@ class PopulateMetadata extends MasterRepo {
}
@Override
- public Repo<Master> call(long tid, Master environment) throws Exception {
+ public Repo<Master> call(long tid, Master env) throws Exception {
KeyExtent extent = new KeyExtent(tableInfo.getTableId(), null, null);
MetadataTableUtil.addTablet(extent, ServerColumnFamily.DEFAULT_TABLET_DIR_NAME,
- environment.getContext(), tableInfo.getTimeType(), environment.getMasterLock());
+ env.getContext(), tableInfo.getTimeType(), env.getMasterLock());
if (tableInfo.getInitialSplitSize() > 0) {
- SortedSet<Text> splits =
- Utils.getSortedSetFromFile(environment.getInputStream(tableInfo.getSplitFile()), true);
- SortedSet<Text> dirs = Utils
- .getSortedSetFromFile(environment.getInputStream(tableInfo.getSplitDirsFile()), false);
+ SortedSet<Text> splits = Utils.getSortedSetFromFile(env, tableInfo.getSplitPath(), true);
+ SortedSet<Text> dirs = Utils.getSortedSetFromFile(env, tableInfo.getSplitDirsPath(), false);
Map<Text,Text> splitDirMap = createSplitDirectoryMap(splits, dirs);
- try (BatchWriter bw = environment.getContext().createBatchWriter("accumulo.metadata")) {
- writeSplitsToMetadataTable(environment.getContext(), tableInfo.getTableId(), splits,
- splitDirMap, tableInfo.getTimeType(), environment.getMasterLock(), bw);
+ try (BatchWriter bw = env.getContext().createBatchWriter(MetadataTable.NAME)) {
+ writeSplitsToMetadataTable(env.getContext(), tableInfo.getTableId(), splits, splitDirMap,
+ tableInfo.getTimeType(), env.getMasterLock(), bw);
}
}
return new FinishCreateTable(tableInfo);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 159a3eb..b92091b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -50,7 +50,6 @@ import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
import org.apache.accumulo.server.problems.ProblemReports;
@@ -104,8 +103,6 @@ public class FileManager {
private Semaphore filePermits;
- private VolumeManager fs;
-
private Cache<String,Long> fileLenCache;
private long maxIdleTime;
@@ -152,8 +149,7 @@ public class FileManager {
}
- public FileManager(ServerContext context, VolumeManager fs, int maxOpen,
- Cache<String,Long> fileLenCache) {
+ public FileManager(ServerContext context, int maxOpen, Cache<String,Long> fileLenCache) {
if (maxOpen <= 0)
throw new IllegalArgumentException("maxOpen <= 0");
@@ -162,7 +158,6 @@ public class FileManager {
this.filePermits = new Semaphore(maxOpen, false);
this.maxOpen = maxOpen;
- this.fs = fs;
this.openFiles = new HashMap<>();
this.reservedReaders = new HashMap<>();
@@ -304,7 +299,7 @@ public class FileManager {
if (!file.contains(":"))
throw new IllegalArgumentException("Expected uri, got : " + file);
Path path = new Path(file);
- FileSystem ns = fs.getFileSystemByPath(path);
+ FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
// log.debug("Opening "+file + " path " + path);
FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(path.toString(), ns, ns.getConf(), context.getCryptoService())
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 82e2eb9..2d9fead 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -862,8 +862,8 @@ public class TabletServer extends AbstractServer {
TServerUtils.stopTServer(server);
try {
- log.debug("Closing filesystem");
- getFileSystem().close();
+ log.debug("Closing filesystems");
+ getVolumeManager().close();
} catch (IOException e) {
log.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
@@ -1194,8 +1194,8 @@ public class TabletServer extends AbstractServer {
return new DfsLogger.ServerResources() {
@Override
- public VolumeManager getFileSystem() {
- return TabletServer.this.getFileSystem();
+ public VolumeManager getVolumeManager() {
+ return TabletServer.this.getVolumeManager();
}
@Override
@@ -1213,7 +1213,7 @@ public class TabletServer extends AbstractServer {
return onlineTablets.snapshot().get(extent);
}
- public VolumeManager getFileSystem() {
+ public VolumeManager getVolumeManager() {
return getContext().getVolumeManager();
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index b3fa2b8..a405ab9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -406,7 +406,7 @@ public class TabletServerResourceManager {
fileLenCache =
CacheBuilder.newBuilder().maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
- fileManager = new FileManager(context, context.getVolumeManager(), maxOpenFiles, fileLenCache);
+ fileManager = new FileManager(context, maxOpenFiles, fileLenCache);
memoryManager = new LargestFirstMemoryManager();
memoryManager.init(context);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 36fd283..c7382bb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -143,7 +143,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
public interface ServerResources {
AccumuloConfiguration getConfiguration();
- VolumeManager getFileSystem();
+ VolumeManager getVolumeManager();
}
private final LinkedBlockingQueue<DfsLogger.LogWork> workQueue = new LinkedBlockingQueue<>();
@@ -421,7 +421,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
String logger = Joiner.on("+").join(address.split(":"));
log.debug("DfsLogger.open() begin");
- VolumeManager fs = conf.getFileSystem();
+ VolumeManager fs = conf.getVolumeManager();
var chooserEnv = new VolumeChooserEnvironmentImpl(ChooserScope.LOGGER, context);
logPath = fs.choose(chooserEnv, ServerConstants.getBaseUris(context)) + Path.SEPARATOR
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 8777d6f..9ea708c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -269,7 +269,7 @@ public class TabletServerLogger {
@Override
public void run() {
final ServerResources conf = tserver.getServerConfig();
- final VolumeManager fs = conf.getFileSystem();
+ final VolumeManager fs = conf.getVolumeManager();
while (!nextLogMaker.isShutdown()) {
log.debug("Creating next WAL");
DfsLogger alog = null;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index b96ea60..505c578 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -108,7 +108,7 @@ public class CompactableUtils {
Set<StoredTabletFile> allFiles) throws IOException {
final Map<StoredTabletFile,Pair<Key,Key>> result = new HashMap<>();
final FileOperations fileFactory = FileOperations.getInstance();
- final VolumeManager fs = tablet.getTabletServer().getFileSystem();
+ final VolumeManager fs = tablet.getTabletServer().getVolumeManager();
for (StoredTabletFile file : allFiles) {
FileSystem ns = fs.getFileSystemByPath(file.getPath());
try (FileSKVIterator openReader = fileFactory.newReaderBuilder()
@@ -150,7 +150,7 @@ public class CompactableUtils {
BlockCache ic = trsm.getIndexCache();
Cache<String,Long> fileLenCache = trsm.getFileLenCache();
MajorCompactionRequest request = new MajorCompactionRequest(tablet.getExtent(),
- CompactableUtils.from(kind), tablet.getTabletServer().getFileSystem(),
+ CompactableUtils.from(kind), tablet.getTabletServer().getVolumeManager(),
tablet.getTableConfiguration(), sc, ic, fileLenCache, tablet.getContext());
request.setFiles(datafiles);
@@ -347,7 +347,7 @@ public class CompactableUtils {
try {
FileOperations fileFactory = FileOperations.getInstance();
Path path = new Path(file.getUri());
- FileSystem ns = tablet.getTabletServer().getFileSystem().getFileSystemByPath(path);
+ FileSystem ns = tablet.getTabletServer().getVolumeManager().getFileSystemByPath(path);
var fiter = fileFactory.newReaderBuilder()
.forFile(path.toString(), ns, ns.getConf(), tablet.getContext().getCryptoService())
.withTableConfiguration(tablet.getTableConfiguration()).seekToBeginning().build();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index 0f891b7..0da9213 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -152,7 +152,7 @@ public class Compactor implements Callable<CompactionStats> {
List<IteratorSetting> iterators, int reason, AccumuloConfiguration tableConfiguation) {
this.context = context;
this.extent = tablet.getExtent();
- this.fs = tablet.getTabletServer().getFileSystem();
+ this.fs = context.getVolumeManager();
this.acuTableConf = tableConfiguation;
this.filesToCompact = files;
this.imm = imm;
@@ -165,7 +165,7 @@ public class Compactor implements Callable<CompactionStats> {
startTime = System.currentTimeMillis();
}
- public VolumeManager getFileSystem() {
+ public VolumeManager getVolumeManager() {
return fs;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 9a8b08e..12ce366 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -271,27 +271,24 @@ class DatafileManager {
// rename before putting in metadata table, so files in metadata table should
// always exist
boolean attemptedRename = false;
+ VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager();
do {
try {
if (dfv.getNumEntries() == 0) {
-
- tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.getPath());
+ vm.deleteRecursively(tmpDatafile.getPath());
} else {
- if (!attemptedRename
- && tablet.getTabletServer().getFileSystem().exists(newDatafile.getPath())) {
+ if (!attemptedRename && vm.exists(newDatafile.getPath())) {
log.warn("Target map file already exist {}", newDatafile);
throw new RuntimeException("File unexpectedly exists " + newDatafile.getPath());
}
// the following checks for spurious rename failures that succeeded but gave an IoE
- if (attemptedRename
- && tablet.getTabletServer().getFileSystem().exists(newDatafile.getPath())
- && !tablet.getTabletServer().getFileSystem().exists(tmpDatafile.getPath())) {
+ if (attemptedRename && vm.exists(newDatafile.getPath())
+ && !vm.exists(tmpDatafile.getPath())) {
// seems like previous rename succeeded, so break
break;
}
attemptedRename = true;
- rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.getPath(),
- newDatafile.getPath());
+ rename(vm, tmpDatafile.getPath(), newDatafile.getPath());
}
break;
} catch (IOException ioe) {
@@ -402,20 +399,20 @@ class DatafileManager {
TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, DataFileValue dfv)
throws IOException {
final KeyExtent extent = tablet.getExtent();
+ VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager();
long t1, t2;
- if (tablet.getTabletServer().getFileSystem().exists(newDatafile.getPath())) {
+ if (vm.exists(newDatafile.getPath())) {
log.error("Target map file already exist " + newDatafile, new Exception());
throw new IllegalStateException("Target map file already exist " + newDatafile);
}
if (dfv.getNumEntries() == 0) {
- tablet.getTabletServer().getFileSystem().deleteRecursively(tmpDatafile.getPath());
+ vm.deleteRecursively(tmpDatafile.getPath());
} else {
// rename before putting in metadata table, so files in metadata table should
// always exist
- rename(tablet.getTabletServer().getFileSystem(), tmpDatafile.getPath(),
- newDatafile.getPath());
+ rename(vm, tmpDatafile.getPath(), newDatafile.getPath());
}
TServerInstance lastLocation = null;
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index 69c7e88..d58a98f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -150,8 +150,8 @@ public class MinorCompactor extends Compactor {
// clean up
try {
- if (getFileSystem().exists(new Path(outputFileName))) {
- getFileSystem().deleteRecursively(new Path(outputFileName));
+ if (getVolumeManager().exists(new Path(outputFileName))) {
+ getVolumeManager().deleteRecursively(new Path(outputFileName));
}
} catch (IOException e) {
log.warn("Failed to delete failed MinC file {} {}", outputFileName, e.getMessage());
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 8f64a10..543df4c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -255,7 +255,7 @@ public class Tablet {
VolumeChooserEnvironment chooserEnv =
new VolumeChooserEnvironmentImpl(extent.tableId(), extent.endRow(), context);
String dirUri =
- tabletServer.getFileSystem().choose(chooserEnv, ServerConstants.getBaseUris(context))
+ tabletServer.getVolumeManager().choose(chooserEnv, ServerConstants.getBaseUris(context))
+ Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.tableId() + Path.SEPARATOR
+ dirName;
checkTabletDir(new Path(dirUri));
@@ -272,7 +272,7 @@ public class Tablet {
if (!checkedTabletDirs.contains(path)) {
FileStatus[] files = null;
try {
- files = getTabletServer().getFileSystem().listStatus(path);
+ files = getTabletServer().getVolumeManager().listStatus(path);
} catch (FileNotFoundException ex) {
// ignored
}
@@ -280,7 +280,7 @@ public class Tablet {
if (files == null) {
log.debug("Tablet {} had no dir, creating {}", extent, path);
- getTabletServer().getFileSystem().mkdirs(path);
+ getTabletServer().getVolumeManager().mkdirs(path);
}
checkedTabletDirs.add(path);
}
@@ -352,8 +352,8 @@ public class Tablet {
absPaths.add(ref.getPathStr());
}
- tabletServer.recover(this.getTabletServer().getFileSystem(), extent, logEntries, absPaths,
- m -> {
+ tabletServer.recover(this.getTabletServer().getVolumeManager(), extent, logEntries,
+ absPaths, m -> {
Collection<ColumnUpdate> muts = m.getUpdates();
for (ColumnUpdate columnUpdate : muts) {
if (!columnUpdate.hasTimestamp()) {
@@ -440,16 +440,14 @@ public class Tablet {
private void removeOldTemporaryFiles() {
// remove any temporary files created by a previous tablet server
try {
- Collection<Volume> volumes = getTabletServer().getFileSystem().getVolumes();
- for (Volume volume : volumes) {
+ for (Volume volume : getTabletServer().getVolumeManager().getVolumes()) {
String dirUri = volume.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ extent.tableId() + Path.SEPARATOR + dirName;
- for (FileStatus tmp : getTabletServer().getFileSystem()
- .globStatus(new Path(dirUri, "*_tmp"))) {
+ for (FileStatus tmp : volume.getFileSystem().globStatus(new Path(dirUri, "*_tmp"))) {
try {
log.debug("Removing old temp file {}", tmp.getPath());
- getTabletServer().getFileSystem().delete(tmp.getPath());
+ volume.getFileSystem().delete(tmp.getPath(), false);
} catch (IOException ex) {
log.error("Unable to remove old temp file " + tmp.getPath() + ": " + ex);
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
index f5b4d43..9691d13 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/TabletServerSyncCheckTest.java
@@ -67,8 +67,7 @@ public class TabletServerSyncCheckTest {
private class TestVolumeManagerImpl extends VolumeManagerImpl {
public TestVolumeManagerImpl(Map<String,Volume> volumes) {
- super(volumes, volumes.values().iterator().next(),
- new ConfigurationCopy(Collections.emptyMap()), new Configuration());
+ super(volumes, new ConfigurationCopy(Collections.emptyMap()), new Configuration());
}
@Override
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
index 19c326f..c9b86c4 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/WalRemovalOrderTest.java
@@ -45,7 +45,7 @@ public class WalRemovalOrderTest {
}
@Override
- public VolumeManager getFileSystem() {
+ public VolumeManager getVolumeManager() {
throw new UnsupportedOperationException();
}
};
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
index 049f268..5e5c937 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.ReqVisFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
import org.apache.accumulo.shell.ShellCommandException;
@@ -91,7 +92,7 @@ public class SetIterCommand extends Command {
String configuredName;
try {
if (profileOpt != null && (currentTableName == null || currentTableName.isBlank())) {
- tmpTable = "accumulo.metadata";
+ tmpTable = MetadataTable.NAME;
shellState.setTableName(tmpTable);
tables = cl.hasOption(OptUtil.tableOpt().getOpt()) || !currentTableName.isEmpty();
}