You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2020/09/23 20:36:12 UTC
[accumulo] branch main updated: Remove
INSTANCE_DFS_URI/INSTANCE_DFS_DIR (#1711)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 618ba7a Remove INSTANCE_DFS_URI/INSTANCE_DFS_DIR (#1711)
618ba7a is described below
commit 618ba7a9f88342ea7c8d692ce19df34d88bf07c8
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Wed Sep 23 16:35:59 2020 -0400
Remove INSTANCE_DFS_URI/INSTANCE_DFS_DIR (#1711)
This fixes #1397
* Remove usages of INSTANCE_DFS_URI and INSTANCE_DFS_DIR properties
* Fail startup if INSTANCE_VOLUMES isn't set
* Simplify some Volume management utility classes (mostly
VolumeConfiguration)
* Ensure files for bulk import test classes are qualified with the
filesystem URI for the filesystem they are using in the test
* Inline and remove redundant getUsableDir method in
AccumuloClusterHarness
---
.../accumulo/core/clientImpl/OfflineIterator.java | 5 +-
.../core/clientImpl/TableOperationsImpl.java | 14 +--
.../accumulo/core/clientImpl/bulk/BulkImport.java | 7 +-
.../accumulo/core/volume/VolumeConfiguration.java | 130 ++++-----------------
.../apache/accumulo/core/conf/PropertyTest.java | 10 +-
.../apache/accumulo/cluster/AccumuloCluster.java | 3 +-
.../standalone/StandaloneAccumuloCluster.java | 11 +-
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 12 +-
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 9 +-
.../MiniAccumuloConfigImplTest.java | 7 +-
.../apache/accumulo/server/ServerConstants.java | 17 ++-
.../org/apache/accumulo/server/ServerInfo.java | 2 +-
.../org/apache/accumulo/server/ServerUtil.java | 3 +-
.../server/client/ClientServiceHandler.java | 9 +-
.../apache/accumulo/server/fs/VolumeManager.java | 10 +-
.../accumulo/server/fs/VolumeManagerImpl.java | 7 +-
.../apache/accumulo/server/init/Initialize.java | 94 ++++-----------
.../apache/accumulo/server/util/ZooKeeperMain.java | 7 --
.../org/apache/accumulo/server/util/ZooZap.java | 5 +-
.../accumulo/server/client/BulkImporterTest.java | 7 +-
.../accumulo/server/fs/VolumeManagerImplTest.java | 5 -
.../accumulo/server/init/InitializeTest.java | 69 ++++-------
.../apache/accumulo/server/util/FileUtilTest.java | 4 +-
.../accumulo/server/util/TServerUtilsTest.java | 2 +-
.../master/tableOps/bulkVer1/BulkImport.java | 12 +-
.../master/tableOps/bulkVer2/PrepBulkImport.java | 12 +-
.../tableOps/tableImport/CreateImportDir.java | 3 +-
.../master/upgrade/RootFilesUpgradeTest.java | 4 +-
.../org/apache/accumulo/tserver/TabletServer.java | 19 ++-
.../accumulo/tserver/ThriftClientHandler.java | 31 ++---
.../org/apache/accumulo/tserver/log/LogSorter.java | 12 +-
.../tserver/replication/ReplicationProcessor.java | 14 +--
.../tserver/replication/ReplicationWorker.java | 12 +-
.../tserver/log/TestUpgradePathForWALogs.java | 21 +++-
.../replication/ReplicationProcessorTest.java | 54 ++++-----
.../accumulo/harness/AccumuloClusterHarness.java | 10 --
.../accumulo/test/BulkImportSequentialRowsIT.java | 2 +-
.../apache/accumulo/test/BulkImportVolumeIT.java | 3 +-
.../org/apache/accumulo/test/ImportExportIT.java | 3 +-
.../java/org/apache/accumulo/test/VolumeIT.java | 5 -
.../apache/accumulo/test/functional/BulkIT.java | 2 +-
.../apache/accumulo/test/functional/BulkOldIT.java | 2 +-
.../test/functional/BulkSplitOptimizationIT.java | 2 +-
.../accumulo/test/functional/CompactionIT.java | 3 +-
.../accumulo/test/performance/NullTserver.java | 2 +-
45 files changed, 228 insertions(+), 449 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
index 95c618c..8b152ee 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
@@ -156,7 +156,6 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
private ClientContext context;
private ScannerOptions options;
private ArrayList<SortedKeyValueIterator<Key,Value>> readers;
- private AccumuloConfiguration config;
public OfflineIterator(ScannerOptions options, ClientContext context,
Authorizations authorizations, Text table, Range range) {
@@ -174,7 +173,6 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
this.readers = new ArrayList<>();
try {
- config = new ConfigurationCopy(context.instanceOperations().getSiteConfiguration());
nextTablet();
while (iter != null && !iter.hasTop())
@@ -310,8 +308,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
// TODO need to close files - ACCUMULO-1303
for (TabletFile file : absFiles) {
- FileSystem fs =
- VolumeConfiguration.getVolume(file.getPathStr(), conf, config).getFileSystem();
+ FileSystem fs = VolumeConfiguration.fileSystemForPath(file.getPathStr(), conf);
FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.getPathStr(), fs, conf, CryptoServiceFactory.newDefaultInstance())
.withTableConfiguration(acuTableConf).build();
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 6c02fee..185f6ea 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -1198,18 +1198,8 @@ public class TableOperationsImpl extends TableOperationsHelper {
private Path checkPath(String dir, String kind, String type)
throws IOException, AccumuloException, AccumuloSecurityException {
- Path ret;
- Map<String,String> props = context.instanceOperations().getSystemConfiguration();
- AccumuloConfiguration conf = new ConfigurationCopy(props);
-
- FileSystem fs =
- VolumeConfiguration.getVolume(dir, context.getHadoopConf(), conf).getFileSystem();
-
- if (dir.contains(":")) {
- ret = new Path(dir);
- } else {
- ret = fs.makeQualified(new Path(dir));
- }
+ FileSystem fs = VolumeConfiguration.fileSystemForPath(dir, context.getHadoopConf());
+ Path ret = dir.contains(":") ? new Path(dir) : fs.makeQualified(new Path(dir));
try {
if (!fs.getFileStatus(ret).isDirectory()) {
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index 44fee8f..deea676 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -62,7 +62,6 @@ import org.apache.accumulo.core.clientImpl.bulk.Bulk.FileInfo;
import org.apache.accumulo.core.clientImpl.bulk.Bulk.Files;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
-import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
@@ -124,11 +123,7 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
TableId tableId = Tables.getTableId(context, tableName);
- Map<String,String> props = context.instanceOperations().getSystemConfiguration();
- AccumuloConfiguration conf = new ConfigurationCopy(props);
-
- FileSystem fs =
- VolumeConfiguration.getVolume(dir, context.getHadoopConf(), conf).getFileSystem();
+ FileSystem fs = VolumeConfiguration.fileSystemForPath(dir, context.getHadoopConf());
Path srcPath = checkPath(fs, dir);
diff --git a/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java b/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java
index b5e0438..c12fd60 100644
--- a/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/volume/VolumeConfiguration.java
@@ -18,12 +18,10 @@
*/
package org.apache.accumulo.core.volume;
-import static java.util.Objects.requireNonNull;
-
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -36,117 +34,39 @@ import org.apache.hadoop.fs.Path;
public class VolumeConfiguration {
- public static Volume getVolume(String path, Configuration conf, AccumuloConfiguration acuconf)
- throws IOException {
- if (requireNonNull(path).contains(":")) {
- // An absolute path
- return new VolumeImpl(new Path(path), conf);
- } else {
- // A relative path
- return getDefaultVolume(conf, acuconf);
- }
- }
-
- public static Volume getDefaultVolume(Configuration conf, AccumuloConfiguration acuconf)
- throws IOException {
- @SuppressWarnings("deprecation")
- String uri = acuconf.get(Property.INSTANCE_DFS_URI);
-
- // By default pull from INSTANCE_DFS_URI, falling back to the Hadoop defined
- // default filesystem (fs.defaultFS or the deprecated fs.default.name)
- if ("".equals(uri))
- return create(FileSystem.get(conf), acuconf);
- else
- try {
- return create(FileSystem.get(new URI(uri), conf), acuconf);
- } catch (URISyntaxException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * @see org.apache.accumulo.core.volume.VolumeConfiguration#getVolumeUris(AccumuloConfiguration,Configuration)
- */
- @Deprecated(since = "1.3.0")
- public static String getConfiguredBaseDir(AccumuloConfiguration conf,
- Configuration hadoopConfig) {
- String singleNamespace = conf.get(Property.INSTANCE_DFS_DIR);
- String dfsUri = conf.get(Property.INSTANCE_DFS_URI);
- String baseDir;
-
- if (dfsUri == null || dfsUri.isEmpty()) {
- try {
- baseDir = FileSystem.get(hadoopConfig).getUri() + singleNamespace;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- if (!dfsUri.contains(":"))
- throw new IllegalArgumentException("Expected fully qualified URI for "
- + Property.INSTANCE_DFS_URI.getKey() + " got " + dfsUri);
- baseDir = dfsUri + singleNamespace;
- }
- return baseDir;
+ public static FileSystem fileSystemForPath(String path, Configuration conf) throws IOException {
+ return path.contains(":") ? new Path(path).getFileSystem(conf) : FileSystem.get(conf);
}
- public static Set<String> getVolumeUris(AccumuloConfiguration conf, Configuration hadoopConfig) {
- String ns = conf.get(Property.INSTANCE_VOLUMES);
-
- // preserve configuration order using LinkedHashSet
- ArrayList<String> configuredBaseDirs = new ArrayList<>();
-
- if (ns == null || ns.isEmpty()) {
- // Fall back to using the old config values
- configuredBaseDirs.add(getConfiguredBaseDir(conf, hadoopConfig));
- } else {
- String[] namespaces = ns.split(",");
- for (String namespace : namespaces) {
- if (!namespace.contains(":")) {
- throw new IllegalArgumentException("Expected fully qualified URI for "
- + Property.INSTANCE_VOLUMES.getKey() + " got " + namespace);
- }
-
- try {
- // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char)
- configuredBaseDirs.add(new Path(new URI(namespace)).toString());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains "
- + namespace + " which has a syntax error", e);
- }
- }
- }
-
- LinkedHashSet<String> deduplicated = new LinkedHashSet<>();
- deduplicated.addAll(configuredBaseDirs);
- if (deduplicated.isEmpty()) {
+ public static Set<String> getVolumeUris(AccumuloConfiguration conf) {
+ String volumes = conf.get(Property.INSTANCE_VOLUMES);
+ if (volumes == null || volumes.isBlank()) {
throw new IllegalArgumentException(
- Property.INSTANCE_VOLUMES.getKey() + " contains no volumes (" + ns + ")");
+ "Missing required property " + Property.INSTANCE_VOLUMES.getKey());
}
- if (deduplicated.size() < configuredBaseDirs.size()) {
+ String[] volArray = volumes.split(",");
+ LinkedHashSet<String> deduplicated =
+ Arrays.stream(volArray).map(VolumeConfiguration::normalizeVolume)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ if (deduplicated.size() < volArray.length) {
throw new IllegalArgumentException(
- Property.INSTANCE_VOLUMES.getKey() + " contains duplicate volumes (" + ns + ")");
+ Property.INSTANCE_VOLUMES.getKey() + " contains duplicate volumes (" + volumes + ")");
}
return deduplicated;
}
- public static Set<String> prefix(Set<String> bases, String suffix) {
- String actualSuffix = suffix.startsWith("/") ? suffix.substring(1) : suffix;
- return bases.stream().map(base -> base + (base.endsWith("/") ? "" : "/") + actualSuffix)
- .collect(Collectors.toCollection(LinkedHashSet::new));
- }
-
- /**
- * Create a Volume with the given FileSystem that writes to the default path
- *
- * @param fs
- * A FileSystem to write to
- * @return A Volume instance writing to the given FileSystem in the default path
- */
- @SuppressWarnings("deprecation")
- public static <T extends FileSystem> Volume create(T fs, AccumuloConfiguration acuconf) {
- String dfsDir = acuconf.get(Property.INSTANCE_DFS_DIR);
- return new VolumeImpl(fs,
- dfsDir == null ? Property.INSTANCE_DFS_DIR.getDefaultValue() : dfsDir);
+ private static String normalizeVolume(String volume) {
+ if (volume == null || volume.isBlank() || !volume.contains(":")) {
+ throw new IllegalArgumentException("Expected fully qualified URI for "
+ + Property.INSTANCE_VOLUMES.getKey() + " got " + volume);
+ }
+ try {
+ // pass through URI to unescape hex encoded chars (e.g. convert %2C to "," char)
+ return new Path(new URI(volume.strip())).toString();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(Property.INSTANCE_VOLUMES.getKey() + " contains '" + volume
+ + "' which has a syntax error", e);
+ }
}
}
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
index 562a8f0..a302920 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
@@ -134,11 +134,6 @@ public class PropertyTest {
}
}
- @SuppressWarnings("deprecation")
- private Property getDeprecatedProperty() {
- return Property.INSTANCE_DFS_DIR;
- }
-
@Test
public void testAnnotations() {
assertTrue(Property.GENERAL_VOLUME_CHOOSER.isExperimental());
@@ -147,9 +142,10 @@ public class PropertyTest {
assertTrue(Property.INSTANCE_SECRET.isSensitive());
assertFalse(Property.INSTANCE_VOLUMES.isSensitive());
- assertTrue(getDeprecatedProperty().isDeprecated());
+ @SuppressWarnings("deprecation")
+ Property deprecatedProp = Property.GENERAL_CLASSPATHS;
+ assertTrue(deprecatedProp.isDeprecated());
assertFalse(Property.INSTANCE_VOLUMES_REPLACEMENTS.isDeprecated());
-
}
@Test
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
index 49fd665..8bc92d0 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/AccumuloCluster.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.cluster;
-import java.io.IOException;
import java.util.Properties;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -108,7 +107,7 @@ public interface AccumuloCluster {
/**
* @return the {@link FileSystem} in use by this cluster
*/
- FileSystem getFileSystem() throws IOException;
+ FileSystem getFileSystem();
/**
* @return A path on {@link FileSystem} this cluster is running on that can be used for temporary
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
index 145af13..bd74821 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -200,14 +201,18 @@ public class StandaloneAccumuloCluster implements AccumuloCluster {
}
@Override
- public FileSystem getFileSystem() throws IOException {
+ public FileSystem getFileSystem() {
Configuration conf = getHadoopConfiguration();
- return FileSystem.get(conf);
+ try {
+ return FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
@Override
public Path getTemporaryPath() {
- return tmp;
+ return getFileSystem().makeQualified(tmp);
}
public ClusterUser getUser(int offset) {
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index c5732c8..597f68f 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -357,8 +357,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
writeConfig(hdfsFile, conf);
Map<String,String> siteConfig = config.getSiteConfig();
- siteConfig.put(Property.INSTANCE_DFS_URI.getKey(), dfsUri);
- siteConfig.put(Property.INSTANCE_DFS_DIR.getKey(), "/accumulo");
+ siteConfig.put(Property.INSTANCE_VOLUMES.getKey(), dfsUri + "/accumulo");
config.setSiteConfig(siteConfig);
} else if (config.useExistingInstance()) {
dfsUri = config.getHadoopConfiguration().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY);
@@ -467,8 +466,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
throw new RuntimeException(e);
}
- String instanceIdFromFile =
- VolumeManager.getInstanceIDFromHdfs(instanceIdPath, cc, hadoopConf);
+ String instanceIdFromFile = VolumeManager.getInstanceIDFromHdfs(instanceIdPath, hadoopConf);
ZooReaderWriter zrw = new ZooReaderWriter(cc.get(Property.INSTANCE_ZK_HOST),
(int) cc.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), cc.get(Property.INSTANCE_SECRET));
@@ -793,13 +791,15 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
@Override
public Path getTemporaryPath() {
+ String p;
if (config.useMiniDFS()) {
- return new Path("/tmp/");
+ p = "/tmp/";
} else {
File tmp = new File(config.getDir(), "tmp");
mkdirs(tmp);
- return new Path(tmp.toString());
+ p = tmp.toString();
}
+ return getFileSystem().makeQualified(new Path(p));
}
@Override
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 7c11f8a..367bbd4 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -136,8 +136,7 @@ public class MiniAccumuloConfigImpl {
// Never want to override these if an existing instance, which may be using the defaults
if (existingInstance == null || !existingInstance) {
existingInstance = false;
- // TODO ACCUMULO-XXXX replace usage of instance.dfs.{dir,uri} with instance.volumes
- setInstanceLocation();
+ mergeProp(Property.INSTANCE_VOLUMES.getKey(), "file://" + accumuloDir.getAbsolutePath());
mergeProp(Property.INSTANCE_SECRET.getKey(), DEFAULT_INSTANCE_SECRET);
mergeProp(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey() + "password", getRootPassword());
}
@@ -230,12 +229,6 @@ public class MiniAccumuloConfigImpl {
}
}
- @SuppressWarnings("deprecation")
- private void setInstanceLocation() {
- mergeProp(Property.INSTANCE_DFS_URI.getKey(), "file:///");
- mergeProp(Property.INSTANCE_DFS_DIR.getKey(), accumuloDir.getAbsolutePath());
- }
-
/**
* Set a given key/value on the site config if it doesn't already exist
*/
diff --git a/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImplTest.java b/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImplTest.java
index e8c7bfb..d0943c4 100644
--- a/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImplTest.java
+++ b/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImplTest.java
@@ -37,9 +37,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
public class MiniAccumuloConfigImplTest {
- @SuppressWarnings("deprecation")
- private static final Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI;
-
@Rule
public TemporaryFolder tempFolder =
new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
@@ -71,10 +68,10 @@ public class MiniAccumuloConfigImplTest {
// constructor site config overrides default props
Map<String,String> siteConfig = new HashMap<>();
- siteConfig.put(INSTANCE_DFS_URI.getKey(), "hdfs://");
+ siteConfig.put(Property.INSTANCE_VOLUMES.getKey(), "hdfs://");
MiniAccumuloConfigImpl config = new MiniAccumuloConfigImpl(tempFolder.getRoot(), "password")
.setSiteConfig(siteConfig).initialize();
- assertEquals("hdfs://", config.getSiteConfig().get(INSTANCE_DFS_URI.getKey()));
+ assertEquals("hdfs://", config.getSiteConfig().get(Property.INSTANCE_VOLUMES.getKey()));
}
@Test
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
index 66abc54..ae6c0da 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -93,8 +94,8 @@ public class ServerConstants {
public static synchronized Set<String> getBaseUris(AccumuloConfiguration conf,
Configuration hadoopConf) {
if (baseUris == null) {
- baseUris = Collections.unmodifiableSet(checkBaseUris(conf, hadoopConf,
- VolumeConfiguration.getVolumeUris(conf, hadoopConf), false));
+ baseUris = Collections.unmodifiableSet(
+ checkBaseUris(conf, hadoopConf, VolumeConfiguration.getVolumeUris(conf), false));
}
return baseUris;
@@ -115,7 +116,7 @@ public class ServerConstants {
String currentIid;
int currentVersion;
try {
- currentIid = VolumeManager.getInstanceIDFromHdfs(path, conf, hadoopConf);
+ currentIid = VolumeManager.getInstanceIDFromHdfs(path, hadoopConf);
Path vpath = new Path(baseDir, VERSION_DIR);
currentVersion =
ServerUtil.getAccumuloPersistentVersion(vpath.getFileSystem(hadoopConf), vpath);
@@ -155,12 +156,18 @@ public class ServerConstants {
public static final String RECOVERY_DIR = "recovery";
public static final String WAL_DIR = "wal";
+ private static Set<String> prefix(Set<String> bases, String suffix) {
+ String actualSuffix = suffix.startsWith("/") ? suffix.substring(1) : suffix;
+ return bases.stream().map(base -> base + (base.endsWith("/") ? "" : "/") + actualSuffix)
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
public static Set<String> getTablesDirs(ServerContext context) {
- return VolumeConfiguration.prefix(getBaseUris(context), TABLE_DIR);
+ return prefix(getBaseUris(context), TABLE_DIR);
}
public static Set<String> getRecoveryDirs(ServerContext context) {
- return VolumeConfiguration.prefix(getBaseUris(context), RECOVERY_DIR);
+ return prefix(getBaseUris(context), RECOVERY_DIR);
}
public static Path getInstanceIdLocation(Volume v) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
index 51ef02a..97e8a9a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerInfo.java
@@ -94,7 +94,7 @@ public class ServerInfo implements ClientInfo {
throw new IllegalStateException(e);
}
Path instanceIdPath = ServerUtil.getAccumuloInstanceIdPath(volumeManager);
- instanceID = VolumeManager.getInstanceIDFromHdfs(instanceIdPath, config, hadoopConf);
+ instanceID = VolumeManager.getInstanceIDFromHdfs(instanceIdPath, hadoopConf);
zooKeepers = config.get(Property.INSTANCE_ZK_HOST);
zooKeepersSessionTimeOut = (int) config.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
zooCache = new ZooCacheFactory().getZooCache(zooKeepers, zooKeepersSessionTimeOut);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
index 30a3d4e..71be74b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerUtil.java
@@ -102,8 +102,7 @@ public class ServerUtil {
public static synchronized Path getAccumuloInstanceIdPath(VolumeManager fs) {
// It doesn't matter which Volume is used as they should all have the instance ID stored
- Volume v = fs.getVolumes().iterator().next();
- return ServerConstants.getInstanceIdLocation(v);
+ return ServerConstants.getInstanceIdLocation(fs.getVolumes().iterator().next());
}
public static void init(ServerContext context, String application) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
index 2c8aad9..ba92d91 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java
@@ -60,7 +60,6 @@ import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.accumulo.server.security.SecurityOperation;
import org.apache.accumulo.server.util.ServerBulkImportStatus;
@@ -75,15 +74,12 @@ public class ClientServiceHandler implements ClientService.Iface {
private static final Logger log = LoggerFactory.getLogger(ClientServiceHandler.class);
protected final TransactionWatcher transactionWatcher;
protected final ServerContext context;
- protected final VolumeManager fs;
protected final SecurityOperation security;
private final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus();
- public ClientServiceHandler(ServerContext context, TransactionWatcher transactionWatcher,
- VolumeManager fs) {
+ public ClientServiceHandler(ServerContext context, TransactionWatcher transactionWatcher) {
this.context = context;
this.transactionWatcher = transactionWatcher;
- this.fs = fs;
this.security = AuditedSecurityOperation.getInstance(context);
}
@@ -463,7 +459,8 @@ public class ClientServiceHandler implements ClientService.Iface {
}
// use the same set of tableIds that were validated above to avoid race conditions
- Map<TreeSet<String>,Long> diskUsage = TableDiskUsage.getDiskUsage(tableIds, fs, context);
+ Map<TreeSet<String>,Long> diskUsage =
+ TableDiskUsage.getDiskUsage(tableIds, context.getVolumeManager(), context);
List<TDiskUsage> retUsages = new ArrayList<>();
for (Map.Entry<TreeSet<String>,Long> usageItem : diskUsage.entrySet()) {
retUsages.add(new TDiskUsage(new ArrayList<>(usageItem.getKey()), usageItem.getValue()));
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
index 2c315cf..ed2d4a1 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.volume.VolumeConfiguration;
import org.apache.accumulo.server.ServerConstants;
@@ -131,7 +130,7 @@ public interface VolumeManager extends AutoCloseable {
// find the appropriate FileSystem object given a path
FileSystem getFileSystemByPath(Path path);
- // return the item in options that is in the same volume as source
+ // return the item in options that is in the same file system as source
Path matchingFileSystem(Path source, Set<String> options);
// forward to the appropriate FileSystem object
@@ -191,11 +190,10 @@ public interface VolumeManager extends AutoCloseable {
Logger log = LoggerFactory.getLogger(VolumeManager.class);
- static String getInstanceIDFromHdfs(Path instanceDirectory, AccumuloConfiguration conf,
- Configuration hadoopConf) {
+ static String getInstanceIDFromHdfs(Path instanceDirectory, Configuration hadoopConf) {
try {
- FileSystem fs = VolumeConfiguration.getVolume(instanceDirectory.toString(), hadoopConf, conf)
- .getFileSystem();
+ FileSystem fs =
+ VolumeConfiguration.fileSystemForPath(instanceDirectory.toString(), hadoopConf);
FileStatus[] files = null;
try {
files = fs.listStatus(instanceDirectory);
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index da62f86..1fed821 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -350,8 +350,10 @@ public class VolumeManagerImpl implements VolumeManager {
throws IOException {
final Map<String,Volume> volumes = new HashMap<>();
+ Set<String> volumeStrings = VolumeConfiguration.getVolumeUris(conf);
+
// The "default" Volume for Accumulo (in case no volumes are specified)
- for (String volumeUriOrDir : VolumeConfiguration.getVolumeUris(conf, hadoopConf)) {
+ for (String volumeUriOrDir : volumeStrings) {
if (volumeUriOrDir.isBlank())
throw new IllegalArgumentException("Empty volume specified in configuration");
@@ -367,7 +369,8 @@ public class VolumeManagerImpl implements VolumeManager {
}
}
- Volume defaultVolume = VolumeConfiguration.getDefaultVolume(hadoopConf, conf);
+ String uri = volumeStrings.iterator().next();
+ Volume defaultVolume = new VolumeImpl(new Path(uri), hadoopConf);
return new VolumeManagerImpl(volumes, defaultVolume, conf, hadoopConf);
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index d631983..dc6c263 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -260,14 +260,8 @@ public class Initialize implements KeywordExecutable {
static boolean checkInit(VolumeManager fs, SiteConfiguration sconf, Configuration hadoopConf)
throws IOException {
- @SuppressWarnings("deprecation")
- String fsUri = sconf.get(Property.INSTANCE_DFS_URI);
- if (fsUri.equals("")) {
- fsUri = FileSystem.getDefaultUri(hadoopConf).toString();
- }
- log.info("Hadoop Filesystem is {}", fsUri);
- log.info("Accumulo data dirs are {}",
- Arrays.asList(VolumeConfiguration.getVolumeUris(sconf, hadoopConf)));
+ log.info("Hadoop Filesystem is {}", FileSystem.getDefaultUri(hadoopConf));
+ log.info("Accumulo data dirs are {}", Arrays.asList(VolumeConfiguration.getVolumeUris(sconf)));
log.info("Zookeeper server is {}", sconf.get(Property.INSTANCE_ZK_HOST));
log.info("Checking if Zookeeper is available. If this hangs, then you need"
+ " to make sure zookeeper is running");
@@ -292,8 +286,8 @@ public class Initialize implements KeywordExecutable {
+ " accumulo.properties. Without this accumulo will not operate" + " correctly");
}
try {
- if (isInitialized(fs, sconf, hadoopConf)) {
- printInitializeFailureMessages(sconf, hadoopConf);
+ if (isInitialized(fs, sconf)) {
+ printInitializeFailureMessages(sconf);
return false;
}
} catch (IOException e) {
@@ -303,39 +297,18 @@ public class Initialize implements KeywordExecutable {
return true;
}
- static void printInitializeFailureMessages(SiteConfiguration sconf, Configuration hadoopConf) {
- @SuppressWarnings("deprecation")
- Property INSTANCE_DFS_DIR = Property.INSTANCE_DFS_DIR;
- @SuppressWarnings("deprecation")
- Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI;
- String instanceDfsDir = sconf.get(INSTANCE_DFS_DIR);
- // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
- log.error("FATAL It appears the directories {}",
- Arrays.asList(VolumeConfiguration.getVolumeUris(sconf, hadoopConf))
- + " were previously initialized.");
- String instanceVolumes = sconf.get(Property.INSTANCE_VOLUMES);
- String instanceDfsUri = sconf.get(INSTANCE_DFS_URI);
-
- // ACCUMULO-3651 Changed level to error and added FATAL to message for slf4j compatibility
-
- if (!instanceVolumes.isEmpty()) {
- log.error("FATAL: Change the property {} to use different filesystems,",
- Property.INSTANCE_VOLUMES);
- } else if (!instanceDfsDir.isEmpty()) {
- log.error("FATAL: Change the property {} to use a different filesystem,", INSTANCE_DFS_URI);
- } else {
- log.error("FATAL: You are using the default URI for the filesystem. Set"
- + " the property {} to use a different filesystem,", Property.INSTANCE_VOLUMES);
- }
- log.error("FATAL: or change the property {} to use a different directory.", INSTANCE_DFS_DIR);
- log.error("FATAL: The current value of {} is |{}|", INSTANCE_DFS_URI, instanceDfsUri);
- log.error("FATAL: The current value of {} is |{}|", INSTANCE_DFS_DIR, instanceDfsDir);
- log.error("FATAL: The current value of {} is |{}|", Property.INSTANCE_VOLUMES, instanceVolumes);
+ static void printInitializeFailureMessages(SiteConfiguration sconf) {
+ log.error("It appears the directories {}",
+ VolumeConfiguration.getVolumeUris(sconf) + " were previously initialized.");
+ log.error("Change the property {} to use different volumes.",
+ Property.INSTANCE_VOLUMES.getKey());
+ log.error("The current value of {} is |{}|", Property.INSTANCE_VOLUMES.getKey(),
+ sconf.get(Property.INSTANCE_VOLUMES));
}
- public boolean doInit(SiteConfiguration siteConfig, Opts opts, Configuration conf,
+ public boolean doInit(SiteConfiguration siteConfig, Opts opts, Configuration hadoopConf,
VolumeManager fs) throws IOException {
- if (!checkInit(fs, siteConfig, conf)) {
+ if (!checkInit(fs, siteConfig, hadoopConf)) {
return false;
}
@@ -364,15 +337,9 @@ public class Initialize implements KeywordExecutable {
opts.rootpass = getRootPassword(siteConfig, opts, rootUser);
}
- return initialize(siteConfig, conf, opts, instanceNamePath, fs, rootUser);
- }
-
- private boolean initialize(SiteConfiguration siteConfig, Configuration hadoopConf, Opts opts,
- String instanceNamePath, VolumeManager fs, String rootUser) {
-
UUID uuid = UUID.randomUUID();
// the actual disk locations of the root table and tablets
- Set<String> configuredVolumes = VolumeConfiguration.getVolumeUris(siteConfig, hadoopConf);
+ Set<String> configuredVolumes = VolumeConfiguration.getVolumeUris(siteConfig);
String instanceName = instanceNamePath.substring(getInstanceNamePrefix().length());
try (ServerContext context =
@@ -401,24 +368,6 @@ public class Initialize implements KeywordExecutable {
rootTabletFileUri, context);
} catch (Exception e) {
log.error("FATAL Failed to initialize filesystem", e);
-
- if (siteConfig.get(Property.INSTANCE_VOLUMES).trim().equals("")) {
-
- final String defaultFsUri = "file:///";
- String fsDefaultName = hadoopConf.get("fs.default.name", defaultFsUri),
- fsDefaultFS = hadoopConf.get("fs.defaultFS", defaultFsUri);
-
- // Try to determine when we couldn't find an appropriate core-site.xml on the classpath
- if (defaultFsUri.equals(fsDefaultName) && defaultFsUri.equals(fsDefaultFS)) {
- log.error(
- "FATAL: Default filesystem value ('fs.defaultFS' or"
- + " 'fs.default.name') of '{}' was found in the Hadoop configuration",
- defaultFsUri);
- log.error("FATAL: Please ensure that the Hadoop core-site.xml is on"
- + " the classpath using 'general.classpaths' in accumulo.properties");
- }
- }
-
return false;
}
@@ -513,7 +462,7 @@ public class Initialize implements KeywordExecutable {
private void initFileSystem(SiteConfiguration siteConfig, Configuration hadoopConf,
VolumeManager fs, UUID uuid, String rootTabletDirUri, String rootTabletFileUri,
ServerContext serverContext) throws IOException {
- initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(siteConfig, hadoopConf), false);
+ initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(siteConfig), false);
// initialize initial system tables config in zookeeper
initSystemTablesConfig(zoo, Constants.ZROOT + "/" + uuid, hadoopConf);
@@ -896,9 +845,9 @@ public class Initialize implements KeywordExecutable {
initialRootMetaConf.put(Property.TABLE_FILE_REPLICATION.getKey(), rep);
}
- public static boolean isInitialized(VolumeManager fs, SiteConfiguration siteConfig,
- Configuration hadoopConf) throws IOException {
- for (String baseDir : VolumeConfiguration.getVolumeUris(siteConfig, hadoopConf)) {
+ public static boolean isInitialized(VolumeManager fs, SiteConfiguration siteConfig)
+ throws IOException {
+ for (String baseDir : VolumeConfiguration.getVolumeUris(siteConfig)) {
if (fs.exists(new Path(baseDir, ServerConstants.INSTANCE_ID_DIR))
|| fs.exists(new Path(baseDir, ServerConstants.VERSION_DIR))) {
return true;
@@ -911,7 +860,7 @@ public class Initialize implements KeywordExecutable {
private static void addVolumes(VolumeManager fs, SiteConfiguration siteConfig,
Configuration hadoopConf) throws IOException {
- Set<String> volumeURIs = VolumeConfiguration.getVolumeUris(siteConfig, hadoopConf);
+ Set<String> volumeURIs = VolumeConfiguration.getVolumeUris(siteConfig);
Set<String> initializedDirs =
ServerConstants.checkBaseUris(siteConfig, hadoopConf, volumeURIs, true);
@@ -924,8 +873,7 @@ public class Initialize implements KeywordExecutable {
Path iidPath = new Path(aBasePath, ServerConstants.INSTANCE_ID_DIR);
Path versionPath = new Path(aBasePath, ServerConstants.VERSION_DIR);
- UUID uuid =
- UUID.fromString(VolumeManager.getInstanceIDFromHdfs(iidPath, siteConfig, hadoopConf));
+ UUID uuid = UUID.fromString(VolumeManager.getInstanceIDFromHdfs(iidPath, hadoopConf));
for (Pair<Path,Path> replacementVolume : ServerConstants.getVolumeReplacements(siteConfig,
hadoopConf)) {
if (aBasePath.equals(replacementVolume.getFirst())) {
@@ -1005,7 +953,7 @@ public class Initialize implements KeywordExecutable {
if (opts.resetSecurity) {
log.info("Resetting security on accumulo.");
try (ServerContext context = new ServerContext(siteConfig)) {
- if (isInitialized(fs, siteConfig, hadoopConfig)) {
+ if (isInitialized(fs, siteConfig)) {
if (!opts.forceResetSecurity) {
ConsoleReader c = getConsoleReader();
String userEnteredName = c.readLine("WARNING: This will remove all"
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
index 701e6d6..15dfc14 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooKeeperMain.java
@@ -20,11 +20,8 @@ package org.apache.accumulo.server.util;
import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.conf.SiteConfiguration;
-import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.start.spi.KeywordExecutable;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import com.beust.jcommander.Parameter;
import com.google.auto.service.AutoService;
@@ -67,10 +64,6 @@ public class ZooKeeperMain implements KeywordExecutable {
Opts opts = new Opts();
opts.parseArgs(ZooKeeperMain.class.getName(), args);
try (var context = new ServerContext(SiteConfiguration.auto())) {
- FileSystem fs = context.getVolumeManager().getDefaultVolume().getFileSystem();
- String baseDir = ServerConstants.getBaseUris(context).iterator().next();
- System.out.println("Using " + fs.makeQualified(new Path(baseDir + "/instance_id"))
- + " to lookup accumulo instance");
if (opts.servers == null) {
opts.servers = context.getZooKeepers();
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
index d3267ab..0603cef 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
@@ -72,15 +72,14 @@ public class ZooZap {
try {
var siteConf = SiteConfiguration.auto();
- Configuration hadoopConf = new Configuration();
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
}
- String volDir = VolumeConfiguration.getVolumeUris(siteConf, hadoopConf).iterator().next();
+ String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next();
Path instanceDir = new Path(volDir, "instance_id");
- String iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, siteConf, hadoopConf);
+ String iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration());
ZooReaderWriter zoo = new ZooReaderWriter(siteConf);
if (opts.zapMaster) {
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 18b4fb5..566596d 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -31,7 +31,9 @@ import java.util.TreeSet;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TabletLocator;
import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoServiceFactory;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -112,8 +114,9 @@ public class BulkImporterTest {
MockTabletLocator locator = new MockTabletLocator();
FileSystem fs = FileSystem.getLocal(new Configuration());
ServerContext context = EasyMock.createMock(ServerContext.class);
- EasyMock.expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance())
- .anyTimes();
+ ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance());
+ conf.set(Property.INSTANCE_VOLUMES, "file:///");
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
EasyMock.expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
.anyTimes();
EasyMock.replay(context);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
index c02714c..1fbddeb 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
@@ -38,7 +38,6 @@ public class VolumeManagerImplTest {
public void invalidChooserConfigured() throws Exception {
List<String> volumes = Arrays.asList("file://one/", "file://two/", "file://three/");
ConfigurationCopy conf = new ConfigurationCopy();
- conf.set(INSTANCE_DFS_URI, volumes.get(0));
conf.set(Property.INSTANCE_VOLUMES, String.join(",", volumes));
conf.set(Property.GENERAL_VOLUME_CHOOSER,
"org.apache.accumulo.server.fs.ChooserThatDoesntExist");
@@ -64,15 +63,11 @@ public class VolumeManagerImplTest {
}
}
- @SuppressWarnings("deprecation")
- private static final Property INSTANCE_DFS_URI = Property.INSTANCE_DFS_URI;
-
// Expected to throw a runtime exception when the WrongVolumeChooser picks an invalid volume.
@Test
public void chooseFromOptions() throws Exception {
Set<String> volumes = Set.of("file://one/", "file://two/", "file://three/");
ConfigurationCopy conf = new ConfigurationCopy();
- conf.set(INSTANCE_DFS_URI, volumes.iterator().next());
conf.set(Property.INSTANCE_VOLUMES, String.join(",", volumes));
conf.set(Property.GENERAL_VOLUME_CHOOSER, WrongVolumeChooser.class.getName());
try (var vm = VolumeManagerImpl.get(conf, hadoopConf)) {
diff --git a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
index 0ebb3a1..2011d41 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java
@@ -21,9 +21,10 @@ package org.apache.accumulo.server.init;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -48,14 +49,15 @@ public class InitializeTest {
private ZooReaderWriter zooOrig;
private ZooReaderWriter zoo;
- @SuppressWarnings("deprecation")
@Before
public void setUp() {
- conf = createMock(Configuration.class);
+ conf = new Configuration(false);
fs = createMock(VolumeManager.class);
sconf = createMock(SiteConfiguration.class);
- expect(sconf.get(Property.INSTANCE_VOLUMES)).andReturn("").anyTimes();
- expect(sconf.get(Property.INSTANCE_DFS_DIR)).andReturn("/bar").anyTimes();
+ expect(sconf.get(Property.INSTANCE_VOLUMES))
+ .andReturn("hdfs://foo/accumulo,hdfs://bar/accumulo").anyTimes();
+ expect(sconf.get(Property.INSTANCE_SECRET))
+ .andReturn(Property.INSTANCE_SECRET.getDefaultValue()).anyTimes();
expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1").anyTimes();
zoo = createMock(ZooReaderWriter.class);
zooOrig = Initialize.getZooReaderWriter();
@@ -65,83 +67,54 @@ public class InitializeTest {
@After
public void tearDown() {
Initialize.setZooReaderWriter(zooOrig);
+ verify(sconf, zoo, fs);
}
- @SuppressWarnings("deprecation")
@Test
public void testIsInitialized_HasInstanceId() throws Exception {
- expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
expect(fs.exists(anyObject(Path.class))).andReturn(true);
- replay(fs, sconf);
- assertTrue(Initialize.isInitialized(fs, sconf, conf));
+ replay(sconf, zoo, fs);
+ assertTrue(Initialize.isInitialized(fs, sconf));
}
- @SuppressWarnings("deprecation")
@Test
public void testIsInitialized_HasDataVersion() throws Exception {
- expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
expect(fs.exists(anyObject(Path.class))).andReturn(false);
expect(fs.exists(anyObject(Path.class))).andReturn(true);
- replay(fs, sconf);
- assertTrue(Initialize.isInitialized(fs, sconf, conf));
+ replay(sconf, zoo, fs);
+ assertTrue(Initialize.isInitialized(fs, sconf));
}
- @SuppressWarnings("deprecation")
@Test
public void testCheckInit_NoZK() throws Exception {
- expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
- expectLastCall().anyTimes();
- replay(sconf);
expect(zoo.exists("/")).andReturn(false);
- replay(zoo);
-
+ replay(sconf, zoo, fs);
assertFalse(Initialize.checkInit(fs, sconf, conf));
}
- @SuppressWarnings("deprecation")
@Test
public void testCheckInit_AlreadyInit() throws Exception {
- expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo").anyTimes();
- expect(sconf.get(Property.INSTANCE_SECRET))
- .andReturn(Property.INSTANCE_SECRET.getDefaultValue());
- replay(sconf);
expect(zoo.exists("/")).andReturn(true);
- replay(zoo);
expect(fs.exists(anyObject(Path.class))).andReturn(true);
- replay(fs);
-
+ replay(sconf, zoo, fs);
assertFalse(Initialize.checkInit(fs, sconf, conf));
}
- @SuppressWarnings("deprecation")
- @Test(expected = IOException.class)
+ @Test
public void testCheckInit_FSException() throws Exception {
- expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo");
- expectLastCall().anyTimes();
- expect(sconf.get(Property.INSTANCE_SECRET))
- .andReturn(Property.INSTANCE_SECRET.getDefaultValue());
- replay(sconf);
expect(zoo.exists("/")).andReturn(true);
- replay(zoo);
expect(fs.exists(anyObject(Path.class))).andThrow(new IOException());
- replay(fs);
-
- Initialize.checkInit(fs, sconf, conf);
+ replay(sconf, zoo, fs);
+ assertThrows(IOException.class, () -> Initialize.checkInit(fs, sconf, conf));
}
- @SuppressWarnings("deprecation")
@Test
public void testCheckInit_OK() throws Exception {
- expect(sconf.get(Property.INSTANCE_DFS_URI)).andReturn("hdfs://foo").anyTimes();
- expect(sconf.get(Property.INSTANCE_SECRET))
- .andReturn(Property.INSTANCE_SECRET.getDefaultValue()).anyTimes();
- replay(sconf);
expect(zoo.exists("/")).andReturn(true);
- replay(zoo);
- expect(fs.exists(anyObject(Path.class))).andReturn(false);
- expect(fs.exists(anyObject(Path.class))).andReturn(false);
- replay(fs);
-
+ // check for volumes initialized calls exists twice for each volume
+ // once for instance_id, and once for version
+ expect(fs.exists(anyObject(Path.class))).andReturn(false).times(4);
+ replay(sconf, zoo, fs);
assertTrue(Initialize.checkInit(fs, sconf, conf));
}
}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/FileUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/FileUtilTest.java
index 5380128..01353d2 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/FileUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/FileUtilTest.java
@@ -39,8 +39,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
public class FileUtilTest {
- @SuppressWarnings("deprecation")
- private static Property INSTANCE_DFS_DIR = Property.INSTANCE_DFS_DIR;
@Rule
public TemporaryFolder tmpDir =
@@ -64,7 +62,7 @@ public class FileUtilTest {
Path tmpPath1 = new Path(tmp1.toURI());
HashMap<Property,String> testProps = new HashMap<>();
- testProps.put(INSTANCE_DFS_DIR, accumuloDir.getAbsolutePath());
+ testProps.put(Property.INSTANCE_VOLUMES, accumuloDir.getAbsolutePath());
try (var fs = VolumeManagerImpl.getLocalForTesting(accumuloDir.getAbsolutePath())) {
FileUtil.cleanupIndexOp(tmpPath1, fs, new ArrayList<>());
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index c63f1a7..ad08b52 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -383,7 +383,7 @@ public class TServerUtilsTest {
expect(ctx.getSaslParams()).andReturn(null).anyTimes();
expect(ctx.getClientTimeoutInMillis()).andReturn((long) 1000).anyTimes();
replay(ctx);
- ClientServiceHandler clientHandler = new ClientServiceHandler(ctx, null, null);
+ ClientServiceHandler clientHandler = new ClientServiceHandler(ctx, null);
Iface rpcProxy = TraceUtil.wrapService(clientHandler);
Processor<Iface> processor = new Processor<>(rpcProxy);
// "localhost" explicitly to make sure we can always bind to that interface (avoids DNS
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
index 9e5b736..370aa71 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
@@ -158,15 +158,13 @@ public class BulkImport extends MasterRepo {
private static Path createNewBulkDir(ServerContext context, VolumeManager fs, String sourceDir,
TableId tableId) throws IOException {
- Path tempPath =
+ Path tableDir =
fs.matchingFileSystem(new Path(sourceDir), ServerConstants.getTablesDirs(context));
- if (tempPath == null)
- throw new IOException(sourceDir + " is not in a volume configured for Accumulo");
-
- String tableDir = tempPath.toString();
if (tableDir == null)
- throw new IOException(sourceDir + " is not in a volume configured for Accumulo");
- Path directory = new Path(tableDir + "/" + tableId);
+ throw new IOException(
+ sourceDir + " is not in the same file system as any volume configured for Accumulo");
+
+ Path directory = new Path(tableDir, tableId.canonical());
fs.mkdirs(directory);
// only one should be able to create the lock file
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index 37576ab..f518333 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -235,15 +235,13 @@ public class PrepBulkImport extends MasterRepo {
private Path createNewBulkDir(ServerContext context, VolumeManager fs, TableId tableId)
throws IOException {
- Path tempPath =
+ Path tableDir =
fs.matchingFileSystem(new Path(bulkInfo.sourceDir), ServerConstants.getTablesDirs(context));
- if (tempPath == null)
- throw new IOException(bulkInfo.sourceDir + " is not in a volume configured for Accumulo");
-
- String tableDir = tempPath.toString();
if (tableDir == null)
- throw new IOException(bulkInfo.sourceDir + " is not in a volume configured for Accumulo");
- Path directory = new Path(tableDir + "/" + tableId);
+ throw new IOException(bulkInfo.sourceDir
+ + " is not in the same file system as any volume configured for Accumulo");
+
+ Path directory = new Path(tableDir, tableId.canonical());
fs.mkdirs(directory);
UniqueNameAllocator namer = context.getUniqueNameAllocator();
diff --git a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/tableImport/CreateImportDir.java b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/tableImport/CreateImportDir.java
index 2a6d2af..0c0071e 100644
--- a/server/manager/src/main/java/org/apache/accumulo/master/tableOps/tableImport/CreateImportDir.java
+++ b/server/manager/src/main/java/org/apache/accumulo/master/tableOps/tableImport/CreateImportDir.java
@@ -72,7 +72,8 @@ class CreateImportDir extends MasterRepo {
log.info("Looking for matching filesystem for {} from options {}", exportDir, tableDirs);
Path base = master.getVolumeManager().matchingFileSystem(exportDir, tableDirs);
if (base == null) {
- throw new IOException(dm.exportDir + " is not in a volume configured for Accumulo");
+ throw new IOException(
+ dm.exportDir + " is not in the same file system as any volume configured for Accumulo");
}
log.info("Chose base table directory of {}", base);
Path directory = new Path(base, tableInfo.tableId.canonical());
diff --git a/server/manager/src/test/java/org/apache/accumulo/master/upgrade/RootFilesUpgradeTest.java b/server/manager/src/test/java/org/apache/accumulo/master/upgrade/RootFilesUpgradeTest.java
index af9b9a0..c429cdb 100644
--- a/server/manager/src/test/java/org/apache/accumulo/master/upgrade/RootFilesUpgradeTest.java
+++ b/server/manager/src/test/java/org/apache/accumulo/master/upgrade/RootFilesUpgradeTest.java
@@ -158,13 +158,11 @@ public class RootFilesUpgradeTest {
}
}
- @SuppressWarnings("deprecation")
@Test
public void testFileReplacement() throws IOException {
ConfigurationCopy conf = new ConfigurationCopy();
- conf.set(Property.INSTANCE_DFS_URI, "file:///");
- conf.set(Property.INSTANCE_DFS_DIR, "/");
+ conf.set(Property.INSTANCE_VOLUMES, "file:///");
conf.set(Property.GENERAL_VOLUME_CHOOSER, RandomVolumeChooser.class.getName());
try (var vm = VolumeManagerImpl.get(conf, new Configuration())) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7087a07..82e2eb9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -202,8 +202,6 @@ public class TabletServer extends AbstractServer {
private final AtomicLong flushCounter = new AtomicLong(0);
private final AtomicLong syncCounter = new AtomicLong(0);
- private final VolumeManager fs;
-
final OnlineTablets onlineTablets = new OnlineTablets();
final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<>());
final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<>());
@@ -248,13 +246,12 @@ public class TabletServer extends AbstractServer {
ServerContext context = super.getContext();
context.setupCrypto();
this.masterLockCache = new ZooCache(context.getZooReaderWriter(), null);
- this.fs = context.getVolumeManager();
final AccumuloConfiguration aconf = getConfiguration();
log.info("Version " + Constants.VERSION);
log.info("Instance " + getInstanceID());
this.sessionManager = new SessionManager(aconf);
- this.logSorter = new LogSorter(context, fs, aconf);
- this.replWorker = new ReplicationWorker(context, fs);
+ this.logSorter = new LogSorter(context, aconf);
+ this.replWorker = new ReplicationWorker(context);
this.statsKeeper = new TabletStatsKeeper();
final int numBusyTabletsToLog = aconf.getCount(Property.TSERV_LOG_BUSY_TABLETS_COUNT);
final long logBusyTabletsDelay =
@@ -866,7 +863,7 @@ public class TabletServer extends AbstractServer {
try {
log.debug("Closing filesystem");
- fs.close();
+ getFileSystem().close();
} catch (IOException e) {
log.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
@@ -962,13 +959,13 @@ public class TabletServer extends AbstractServer {
}
}
- private void checkWalCanSync(ServerContext context) {
+ private static void checkWalCanSync(ServerContext context) {
VolumeChooserEnvironment chooserEnv =
new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.ChooserScope.LOGGER, context);
Set<String> prefixes;
var options = ServerConstants.getBaseUris(context);
try {
- prefixes = fs.choosable(chooserEnv, options);
+ prefixes = context.getVolumeManager().choosable(chooserEnv, options);
} catch (RuntimeException e) {
log.warn("Unable to determine if WAL directories ({}) support sync or flush. "
+ "Data loss may occur.", Arrays.asList(options), e);
@@ -978,7 +975,7 @@ public class TabletServer extends AbstractServer {
boolean warned = false;
for (String prefix : prefixes) {
String logPath = prefix + Path.SEPARATOR + ServerConstants.WAL_DIR;
- if (!fs.canSyncAndFlush(new Path(logPath))) {
+ if (!context.getVolumeManager().canSyncAndFlush(new Path(logPath))) {
// sleep a few seconds in case this is at cluster start...give monitor
// time to start so the warning will be more visible
if (!warned) {
@@ -1198,7 +1195,7 @@ public class TabletServer extends AbstractServer {
@Override
public VolumeManager getFileSystem() {
- return fs;
+ return TabletServer.this.getFileSystem();
}
@Override
@@ -1217,7 +1214,7 @@ public class TabletServer extends AbstractServer {
}
public VolumeManager getFileSystem() {
- return fs;
+ return getContext().getVolumeManager();
}
public int getOpeningCount() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
index 1f83656..58ffcb8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
@@ -121,6 +121,7 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.data.ServerMutation;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.util.Halt;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
@@ -167,7 +168,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
private final RowLocks rowLocks = new RowLocks();
ThriftClientHandler(TabletServer server) {
- super(server.getContext(), new TransactionWatcher(server.getContext()), server.getFileSystem());
+ super(server.getContext(), new TransactionWatcher(server.getContext()));
this.server = server;
log.debug("{} created", ThriftClientHandler.class.getName());
}
@@ -192,7 +193,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
Path path = new Path(mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
+ FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
path = ns.makeQualified(path);
fileRefMap.put(new TabletFile(path), mapping.getValue());
}
@@ -234,7 +235,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
Path path = new Path(dir, mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
+ FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
path = ns.makeQualified(path);
newFileMap.put(new TabletFile(path), mapping.getValue());
}
@@ -260,7 +261,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
return null;
}
- return server.getContext().getTableConfiguration(extent.tableId()).getScanDispatcher();
+ return context.getTableConfiguration(extent.tableId()).getScanDispatcher();
}
@Override
@@ -995,7 +996,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
final CompressedIterators compressedIters = new CompressedIterators(symbols);
ConditionCheckerContext checkerContext = new ConditionCheckerContext(server.getContext(),
- compressedIters, server.getContext().getTableConfiguration(cs.tableId));
+ compressedIters, context.getTableConfiguration(cs.tableId));
while (iter.hasNext()) {
final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next();
@@ -1323,7 +1324,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
}
} catch (ThriftSecurityException e) {
log.warn("Got {} message from unauthenticatable user: {}", request, e.getUser());
- if (server.getContext().getCredentials().getToken().getClass().getName()
+ if (context.getCredentials().getToken().getClass().getName()
.equals(credentials.getTokenClassName())) {
log.error("Got message from a service with a mismatched configuration."
+ " Please ensure a compatible configuration.", e);
@@ -1347,7 +1348,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
if (lock != null) {
ZooUtil.LockID lid =
- new ZooUtil.LockID(server.getContext().getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
+ new ZooUtil.LockID(context.getZooKeeperRoot() + Constants.ZMASTER_LOCK, lock);
try {
if (!ZooLock.isLockHeld(server.masterLockCache, lid)) {
@@ -1715,8 +1716,7 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
ExecutorService es = server.resourceManager.getSummaryPartitionExecutor();
Future<SummaryCollection> future = new Gatherer(server.getContext(), request,
- server.getContext().getTableConfiguration(tableId), server.getContext().getCryptoService())
- .gather(es);
+ context.getTableConfiguration(tableId), context.getCryptoService()).gather(es);
return startSummaryOperation(credentials, future);
}
@@ -1733,9 +1733,10 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
ExecutorService spe = server.resourceManager.getSummaryRemoteExecutor();
TableConfiguration tableConfig =
- server.getContext().getTableConfiguration(TableId.of(request.getTableId()));
- Future<SummaryCollection> future = new Gatherer(server.getContext(), request, tableConfig,
- server.getContext().getCryptoService()).processPartition(spe, modulus, remainder);
+ context.getTableConfiguration(TableId.of(request.getTableId()));
+ Future<SummaryCollection> future =
+ new Gatherer(server.getContext(), request, tableConfig, context.getCryptoService())
+ .processPartition(spe, modulus, remainder);
return startSummaryOperation(credentials, future);
}
@@ -1751,14 +1752,14 @@ class ThriftClientHandler extends ClientServiceHandler implements TabletClientSe
}
ExecutorService srp = server.resourceManager.getSummaryRetrievalExecutor();
- TableConfiguration tableCfg =
- server.getContext().getTableConfiguration(TableId.of(request.getTableId()));
+ TableConfiguration tableCfg = context.getTableConfiguration(TableId.of(request.getTableId()));
BlockCache summaryCache = server.resourceManager.getSummaryCache();
BlockCache indexCache = server.resourceManager.getIndexCache();
Cache<String,Long> fileLenCache = server.resourceManager.getFileLenCache();
+ VolumeManager fs = context.getVolumeManager();
FileSystemResolver volMgr = fs::getFileSystemByPath;
Future<SummaryCollection> future =
- new Gatherer(server.getContext(), request, tableCfg, server.getContext().getCryptoService())
+ new Gatherer(server.getContext(), request, tableCfg, context.getCryptoService())
.processFiles(volMgr, files, summaryCache, indexCache, fileLenCache, srp);
return startSummaryOperation(credentials, future);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index e0d52ce..a024115 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -31,12 +31,12 @@ import java.util.Map.Entry;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
public class LogSorter {
private static final Logger log = LoggerFactory.getLogger(LogSorter.class);
- VolumeManager fs;
AccumuloConfiguration conf;
private final Map<String,LogProcessor> currentWork = Collections.synchronizedMap(new HashMap<>());
@@ -104,6 +103,8 @@ public class LogSorter {
sortStart = System.currentTimeMillis();
}
+ VolumeManager fs = context.getVolumeManager();
+
String formerThreadName = Thread.currentThread().getName();
int part = 0;
try {
@@ -177,7 +178,7 @@ public class LogSorter {
private void writeBuffer(String destPath, List<Pair<LogFileKey,LogFileValue>> buffer, int part)
throws IOException {
Path path = new Path(destPath, String.format("part-r-%05d", part));
- FileSystem ns = fs.getFileSystemByPath(path);
+ FileSystem ns = context.getVolumeManager().getFileSystemByPath(path);
try (MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns.makeQualified(path),
MapFile.Writer.keyClass(LogFileKey.class),
@@ -215,12 +216,11 @@ public class LogSorter {
}
ThreadPoolExecutor threadPool;
- private final ClientContext context;
+ private final ServerContext context;
private double walBlockSize;
- public LogSorter(ClientContext context, VolumeManager fs, AccumuloConfiguration conf) {
+ public LogSorter(ServerContext context, AccumuloConfiguration conf) {
this.context = context;
- this.fs = fs;
this.conf = conf;
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 4bac615..b6d6e1f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
@@ -34,7 +33,6 @@ import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.accumulo.server.replication.ReplicaSystem;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
@@ -56,22 +54,18 @@ public class ReplicationProcessor implements Processor {
private static final Logger log = LoggerFactory.getLogger(ReplicationProcessor.class);
private final ServerContext context;
- private final AccumuloConfiguration conf;
- private final VolumeManager fs;
private final ReplicaSystemHelper helper;
private final ReplicaSystemFactory factory;
- public ReplicationProcessor(ServerContext context, AccumuloConfiguration conf, VolumeManager fs) {
+ public ReplicationProcessor(ServerContext context) {
this.context = context;
- this.conf = conf;
- this.fs = fs;
this.helper = new ReplicaSystemHelper(context);
this.factory = new ReplicaSystemFactory();
}
@Override
public ReplicationProcessor newProcessor() {
- return new ReplicationProcessor(context, context.getConfiguration(), fs);
+ return new ReplicationProcessor(context);
}
@Override
@@ -157,7 +151,7 @@ public class ReplicationProcessor implements Processor {
protected String getPeerType(String peerName) {
// Find the configured replication peer so we know how to replicate to it
Map<String,String> configuredPeers =
- conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
+ context.getConfiguration().getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
if (peerType == null) {
String msg = "Cannot process replication for unknown peer: " + peerName;
@@ -169,7 +163,7 @@ public class ReplicationProcessor implements Processor {
}
protected boolean doesFileExist(Path filePath, ReplicationTarget target) throws IOException {
- if (!fs.exists(filePath)) {
+ if (!context.getVolumeManager().exists(filePath)) {
log.warn("Received work request for {} and {}, but the file doesn't exist", filePath, target);
return false;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index 47c09d7..c9d03bb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -25,7 +25,6 @@ import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.ReplicationConstants;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -37,15 +36,11 @@ import org.slf4j.LoggerFactory;
public class ReplicationWorker implements Runnable {
private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);
- private ServerContext context;
- private AccumuloConfiguration conf;
- private VolumeManager fs;
+ private final ServerContext context;
private ThreadPoolExecutor executor;
- public ReplicationWorker(ServerContext context, VolumeManager fs) {
+ public ReplicationWorker(ServerContext context) {
this.context = context;
- this.fs = fs;
- this.conf = context.getConfiguration();
}
public void setExecutor(ThreadPoolExecutor executor) {
@@ -57,6 +52,7 @@ public class ReplicationWorker implements Runnable {
DefaultConfiguration defaultConf = DefaultConfiguration.getInstance();
long defaultDelay = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
long defaultPeriod = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
+ AccumuloConfiguration conf = context.getConfiguration();
long delay = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
long period = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
try {
@@ -72,7 +68,7 @@ public class ReplicationWorker implements Runnable {
context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf);
}
- workQueue.startProcessing(new ReplicationProcessor(context, conf, fs), executor);
+ workQueue.startProcessing(new ReplicationProcessor(context), executor);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
index d107e8a..ff99171 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/TestUpgradePathForWALogs.java
@@ -18,6 +18,10 @@
*/
package org.apache.accumulo.tserver.log;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -28,6 +32,7 @@ import java.io.OutputStream;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.log.SortedLogState;
@@ -47,8 +52,8 @@ public class TestUpgradePathForWALogs {
private static final String WALOG_FROM_15 = "/walog-from-15.walog";
private static final String WALOG_FROM_16 = "/walog-from-16.walog";
- AccumuloConfiguration config = DefaultConfiguration.getInstance();
- VolumeManager fs;
+ private static final AccumuloConfiguration config = DefaultConfiguration.getInstance();
+ private ServerContext context;
@Rule
public TemporaryFolder tempFolder =
@@ -56,18 +61,22 @@ public class TestUpgradePathForWALogs {
@Before
public void setUp() throws Exception {
+ context = createMock(ServerContext.class);
File workDir = tempFolder.newFolder();
String path = workDir.getAbsolutePath();
assertTrue(workDir.delete());
- fs = VolumeManagerImpl.getLocalForTesting(path);
+ VolumeManager fs = VolumeManagerImpl.getLocalForTesting(path);
Path manyMapsPath = new Path("file://" + path);
fs.mkdirs(manyMapsPath);
fs.create(SortedLogState.getFinishedMarkerPath(manyMapsPath)).close();
+ expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+ replay(context);
}
@After
public void tearDown() throws IOException {
- fs.close();
+ context.getVolumeManager().close();
+ verify(context);
}
@Test
@@ -86,7 +95,7 @@ public class TestUpgradePathForWALogs {
walogInHDFStream.close();
walogInHDFStream = null;
- LogSorter logSorter = new LogSorter(null, fs, config);
+ LogSorter logSorter = new LogSorter(context, config);
LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
logProcessor.sort(WALOG_FROM_15,
@@ -122,7 +131,7 @@ public class TestUpgradePathForWALogs {
walogInHDFStream.close();
walogInHDFStream = null;
- LogSorter logSorter = new LogSorter(null, fs, config);
+ LogSorter logSorter = new LogSorter(context, config);
LogSorter.LogProcessor logProcessor = logSorter.new LogProcessor();
logProcessor.sort(walogToTest,
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index 7906684..06e234a 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -19,9 +19,13 @@
package org.apache.accumulo.tserver.replication;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
-import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.conf.ConfigurationCopy;
@@ -29,52 +33,48 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.accumulo.server.replication.ReplicaSystem;
import org.apache.accumulo.server.replication.ReplicaSystemHelper;
import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.hadoop.fs.Path;
-import org.easymock.EasyMock;
import org.junit.Test;
public class ReplicationProcessorTest {
@Test
public void peerTypeExtractionFromConfiguration() {
- VolumeManager fs = EasyMock.createMock(VolumeManager.class);
- ServerContext context = EasyMock.createMock(ServerContext.class);
-
- Map<String,String> data = new HashMap<>();
+ ServerContext context = createMock(ServerContext.class);
String peerName = "peer";
String configuration = "java.lang.String,foo";
- data.put(Property.REPLICATION_PEERS + peerName, configuration);
- ConfigurationCopy conf = new ConfigurationCopy(data);
-
- ReplicationProcessor proc = new ReplicationProcessor(context, conf, fs);
+ var conf = new ConfigurationCopy(Map.of(Property.REPLICATION_PEERS + peerName, configuration));
+ expect(context.getConfiguration()).andReturn(conf);
+ replay(context);
+ ReplicationProcessor proc = new ReplicationProcessor(context);
assertEquals(configuration, proc.getPeerType(peerName));
+ verify(context);
}
@Test(expected = IllegalArgumentException.class)
public void noPeerConfigurationThrowsAnException() {
- VolumeManager fs = EasyMock.createMock(VolumeManager.class);
- ServerContext context = EasyMock.createMock(ServerContext.class);
-
- Map<String,String> data = new HashMap<>();
- ConfigurationCopy conf = new ConfigurationCopy(data);
+ ServerContext context = createMock(ServerContext.class);
- ReplicationProcessor proc = new ReplicationProcessor(context, conf, fs);
+ var conf = new ConfigurationCopy(Map.of());
+ expect(context.getConfiguration()).andReturn(conf);
+ replay(context);
+ ReplicationProcessor proc = new ReplicationProcessor(context);
proc.getPeerType("foo");
+ verify(context);
}
@Test
public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
- ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
- ReplicaSystemHelper helper = EasyMock.createMock(ReplicaSystemHelper.class);
- ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class)
+ ReplicaSystem replica = createMock(ReplicaSystem.class);
+ ReplicaSystemHelper helper = createMock(ReplicaSystemHelper.class);
+ ReplicationProcessor proc = createMockBuilder(ReplicationProcessor.class)
.addMockedMethods("getReplicaSystem", "doesFileExist", "getStatus", "getHelper")
.createMock();
@@ -85,16 +85,16 @@ public class ReplicationProcessorTest {
String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.toString(), target);
- EasyMock.expect(proc.getReplicaSystem(target)).andReturn(replica);
- EasyMock.expect(proc.getStatus(path.toString(), target)).andReturn(status);
- EasyMock.expect(proc.doesFileExist(path, target)).andReturn(true);
- EasyMock.expect(proc.getHelper()).andReturn(helper);
- EasyMock.expect(replica.replicate(path, status, target, helper)).andReturn(status);
+ expect(proc.getReplicaSystem(target)).andReturn(replica);
+ expect(proc.getStatus(path.toString(), target)).andReturn(status);
+ expect(proc.doesFileExist(path, target)).andReturn(true);
+ expect(proc.getHelper()).andReturn(helper);
+ expect(replica.replicate(path, status, target, helper)).andReturn(status);
- EasyMock.replay(replica, proc);
+ replay(replica, proc);
proc.process(queueKey, path.toString().getBytes(UTF_8));
- EasyMock.verify(replica, proc);
+ verify(replica, proc);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
index 8774732..9f92105 100644
--- a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
+++ b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java
@@ -47,7 +47,6 @@ import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.categories.StandaloneCapableClusterTests;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.AfterClass;
@@ -368,13 +367,4 @@ public abstract class AccumuloClusterHarness extends AccumuloITBase
return true;
}
- /**
- * Tries to give a reasonable directory which can be used to create temporary files for the test.
- * Makes a basic attempt to create the directory if it does not already exist.
- *
- * @return A directory which can be expected to exist on the Cluster's FileSystem
- */
- public Path getUsableDir() throws IllegalArgumentException {
- return cluster.getTemporaryPath();
- }
}
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
index 4777d75..ce61e70 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java
@@ -69,7 +69,7 @@ public class BulkImportSequentialRowsIT extends AccumuloClusterHarness {
TableOperations to = client.tableOperations();
to.create(tableName);
FileSystem fs = getFileSystem();
- Path rootPath = new Path(fs.makeQualified(getUsableDir()), getClass().getSimpleName());
+ Path rootPath = new Path(cluster.getTemporaryPath(), getClass().getSimpleName());
log.info("Writing to {}", rootPath);
if (fs.exists(rootPath)) {
assertTrue(fs.delete(rootPath, true));
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java
index e073f77..3bae562 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java
@@ -72,8 +72,7 @@ public class BulkImportVolumeIT extends AccumuloClusterHarness {
try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
client.tableOperations().create(tableName);
FileSystem fs = getFileSystem();
- Path rootPath =
- new Path(fs.getUri().toString() + cluster.getTemporaryPath(), getClass().getName());
+ Path rootPath = new Path(cluster.getTemporaryPath(), getClass().getName());
fs.deleteOnExit(rootPath);
Path bulk = new Path(rootPath, "bulk");
diff --git a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
index 0f68372..a253bbd 100644
--- a/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ImportExportIT.java
@@ -98,9 +98,8 @@ public class ImportExportIT extends AccumuloClusterHarness {
// Make a directory we can use to throw the export and import directories
// Must exist on the filesystem the cluster is running.
FileSystem fs = cluster.getFileSystem();
- Path tmp = cluster.getTemporaryPath();
log.info("Using FileSystem: " + fs);
- Path baseDir = new Path(fs.getUri().toString() + tmp, getClass().getName());
+ Path baseDir = new Path(cluster.getTemporaryPath(), getClass().getName());
fs.deleteOnExit(baseDir);
if (fs.exists(baseDir)) {
log.info("{} exists on filesystem, deleting", baseDir);
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
index 859901c..7fc0983 100644
--- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -93,7 +92,6 @@ public class VolumeIT extends ConfigurableMacBase {
return 10 * 60;
}
- @SuppressWarnings("deprecation")
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
File baseDir = cfg.getDir();
@@ -111,9 +109,6 @@ public class VolumeIT extends ConfigurableMacBase {
}
// Run MAC on two locations in the local file system
- URI v1Uri = v1.toUri();
- cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
- cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
cfg.setProperty(Property.INSTANCE_VOLUMES, v1 + "," + v2);
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "15s");
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
index 4f473b9..342134f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkIT.java
@@ -69,7 +69,7 @@ public class BulkIT extends AccumuloClusterHarness {
static void runTest(AccumuloClient c, ClientInfo info, FileSystem fs, Path basePath,
String tableName, String filePrefix, String dirSuffix, boolean useOld) throws Exception {
c.tableOperations().create(tableName);
- Path base = new Path(fs.getUri().toString() + basePath, "testBulkFail_" + dirSuffix);
+ Path base = new Path(basePath, "testBulkFail_" + dirSuffix);
fs.delete(base, true);
fs.mkdirs(base);
fs.deleteOnExit(base);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java
index 478eca8..6a2ac52 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java
@@ -74,7 +74,7 @@ public class BulkOldIT extends AccumuloClusterHarness {
Configuration conf = new Configuration();
AccumuloConfiguration aconf = getCluster().getServerContext().getConfiguration();
FileSystem fs = getCluster().getFileSystem();
- String rootPath = fs.getUri().toString() + cluster.getTemporaryPath().toString();
+ String rootPath = cluster.getTemporaryPath().toString();
String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0];
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
index 5face15..4d28b43 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java
@@ -92,7 +92,7 @@ public class BulkSplitOptimizationIT extends AccumuloClusterHarness {
c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000");
c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G");
FileSystem fs = cluster.getFileSystem();
- Path testDir = new Path(fs.getUri().toString() + getUsableDir(), "testmf");
+ Path testDir = new Path(cluster.getTemporaryPath(), "testmf");
fs.deleteOnExit(testDir);
FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8);
FileStatus[] stats = fs.listStatus(testDir);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index dc48f1f..4d9e4dc 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -119,8 +119,7 @@ public class CompactionIT extends AccumuloClusterHarness {
c.tableOperations().create(tableName);
c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1.0");
FileSystem fs = getFileSystem();
- Path root =
- new Path(fs.getUri().toString() + cluster.getTemporaryPath(), getClass().getName());
+ Path root = new Path(cluster.getTemporaryPath(), getClass().getName());
fs.deleteOnExit(root);
Path testrf = new Path(root, "testrf");
fs.deleteOnExit(testrf);
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index 999ad89..462642e 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -96,7 +96,7 @@ public class NullTserver {
private long updateSession = 1;
public ThriftClientHandler(ServerContext context, TransactionWatcher watcher) {
- super(context, watcher, null);
+ super(context, watcher);
}
@Override