You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/02/08 20:41:39 UTC

[accumulo] branch 2.1 updated: Enable users to provide per-volume Hadoop Filesystem overrides (#3180)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 401270ada6 Enable users to provide per-volume Hadoop Filesystem overrides (#3180)
401270ada6 is described below

commit 401270ada6a9fb982c7ce9b7a76a641c47a13369
Author: Dave Marion <dl...@apache.org>
AuthorDate: Wed Feb 8 15:41:33 2023 -0500

    Enable users to provide per-volume Hadoop Filesystem overrides (#3180)
    
    When creating Hadoop FileSystem objects the FileSystem class caches
    FileSystem objects using the filesystem URI scheme and authority fields.
    Prior to this change, the Accumulo VolumeManager would create FileSystem
    objects (one per volume) using the standard Hadoop Configuration files
    and resources. However, user-supplied Hadoop configuration files
    (core-site.xml and hdfs-site.xml) don't allow the user to specify
    different properties for different filesystems. For example, a user
    can create a core-site.xml and hdfs-site.xml file locally and create
    FileSystem objects that connect to different HDFS clusters. A property
    specified in the Hadoop configuration will apply to all FileSystem
    objects.
    
    With this change users can specify Hadoop configuration overrides
    in the Accumulo configuration using the property format:
    instance.volume.config.<volume-uri>.<property-name>=<property-value>
    
    This enables a user with multiple filesystems specified in the
    instance.volumes property to provide different Hadoop configuration
    overrides for each volume. For example, the following would allow
    different hedged read thread sizes for two different volumes:
    
    instance.volumes=hdfs://namespace-a:8020/accumulo,hdfs://namespace-b:8020/accumulo
    instance.volume.config.hdfs://namespace-a:8020/accumulo.dfs.client.hedged.read.threadpool.size=10
    instance.volume.config.hdfs://namespace-b:8020/accumulo.dfs.client.hedged.read.threadpool.size=20
    
    Note: When specifying property names that contain colons in the accumulo.properties
    files, they need to be escaped with a backslash
---
 .../org/apache/accumulo/core/conf/Property.java    |  10 ++
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   |   5 +
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |  24 ++++
 .../accumulo/server/fs/VolumeManagerImpl.java      |  60 ++++++++-
 .../accumulo/server/fs/VolumeManagerImplTest.java  | 137 +++++++++++++++++++
 .../org/apache/accumulo/test/VolumeManagerIT.java  | 147 +++++++++++++++++++++
 6 files changed, 381 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index d7cc2748a8..3417cea934 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -120,6 +120,16 @@ public enum Property {
           + " a comma or other reserved characters in a URI use standard URI hex"
           + " encoding. For example replace commas with %2C.",
       "1.6.0"),
+  INSTANCE_VOLUME_CONFIG_PREFIX("instance.volume.config.", null, PropertyType.PREFIX,
+      "Properties in this category are used to provide volume specific overrides to "
+          + "the general filesystem client configuration. Properties using this prefix "
+          + "should be in the form "
+          + "'instance.volume.config.<volume-uri>.<property-name>=<property-value>. An "
+          + "example: "
+          + "'instance.volume.config.hdfs://namespace-a:8020/accumulo.dfs.client.hedged.read.threadpool.size=10'. "
+          + "Note that when specifying property names that contain colons in the properties "
+          + "files that the colons need to be escaped with a backslash.",
+      "2.1.1"),
   INSTANCE_VOLUMES_REPLACEMENTS("instance.volumes.replacements", "", PropertyType.STRING,
       "Since accumulo stores absolute URIs changing the location of a namenode "
           + "could prevent Accumulo from starting. The property helps deal with "
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 4d5067a895..a5f8b08b08 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -219,6 +219,11 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster {
       dfsUri = "file:///";
     }
 
+    // Perform any modifications to the site config that need to happen
+    // after the instance volumes are set, and before the config is
+    // written out and MAC started.
+    config.preStartConfigUpdate();
+
     File clientConfFile = config.getClientConfFile();
     // Write only the properties that correspond to ClientConfiguration properties
     writeConfigProperties(clientConfFile,
diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 6578052b83..69f3bc99d4 100644
--- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
  * @since 1.6.0
  */
 public class MiniAccumuloConfigImpl {
+
   private static final Logger log = LoggerFactory.getLogger(MiniAccumuloConfigImpl.class);
   private static final String DEFAULT_INSTANCE_SECRET = "DONTTELL";
 
@@ -97,6 +99,8 @@ public class MiniAccumuloConfigImpl {
   private Configuration hadoopConf;
   private SiteConfiguration accumuloConf;
 
+  private Consumer<MiniAccumuloConfigImpl> preStartConfigProcessor;
+
   /**
    * @param dir An empty or nonexistent directory that Accumulo and Zookeeper can store data in.
    *        Creating the directory is left to the user. Java 7, Guava, and Junit provide methods for
@@ -812,4 +816,24 @@ public class MiniAccumuloConfigImpl {
   public void setNumCompactors(int numCompactors) {
     this.numCompactors = numCompactors;
   }
+
+  /**
+   * Set the object that will be used to modify the site configuration right before it's written out
+   * a file. This would be useful in the case where the configuration needs to be updated based on a
+   * property that is set in MiniAccumuloClusterImpl like instance.volumes
+   *
+   */
+  public void setPreStartConfigProcessor(Consumer<MiniAccumuloConfigImpl> processor) {
+    this.preStartConfigProcessor = processor;
+  }
+
+  /**
+   * Called by MiniAccumuloClusterImpl after all modifications are done to the configuration and
+   * right before it's written out to a file.
+   */
+  public void preStartConfigUpdate() {
+    if (this.preStartConfigProcessor != null) {
+      this.preStartConfigProcessor.accept(this);
+    }
+  }
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
index b999bbea74..ab0d86f6a9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
@@ -76,6 +77,7 @@ public class VolumeManagerImpl implements VolumeManager {
   private final Map<String,Volume> volumesByName;
   private final Multimap<URI,Volume> volumesByFileSystemUri;
   private final VolumeChooser chooser;
+  private final AccumuloConfiguration conf;
   private final Configuration hadoopConf;
 
   protected VolumeManagerImpl(Map<String,Volume> volumes, AccumuloConfiguration conf,
@@ -98,6 +100,7 @@ public class VolumeManagerImpl implements VolumeManager {
           "Failed to load volume chooser specified by " + Property.GENERAL_VOLUME_CHOOSER);
     }
     chooser = chooser1;
+    this.conf = conf;
     this.hadoopConf = hadoopConf;
   }
 
@@ -249,7 +252,14 @@ public class VolumeManagerImpl implements VolumeManager {
   public FileSystem getFileSystemByPath(Path path) {
     FileSystem desiredFs;
     try {
-      desiredFs = requireNonNull(path).getFileSystem(hadoopConf);
+      Configuration volumeConfig = hadoopConf;
+      for (String vol : volumesByName.keySet()) {
+        if (path.toString().startsWith(vol)) {
+          volumeConfig = getVolumeManagerConfiguration(conf, hadoopConf, vol);
+          break;
+        }
+      }
+      desiredFs = requireNonNull(path).getFileSystem(volumeConfig);
     } catch (IOException ex) {
       throw new UncheckedIOException(ex);
     }
@@ -353,12 +363,56 @@ public class VolumeManagerImpl implements VolumeManager {
     return getFileSystemByPath(path).getDefaultReplication(path);
   }
 
+  /**
+   * The Hadoop Configuration object does not currently allow for duplicate properties to be set in
+   * a single Configuration for different FileSystem URIs. Here we will look for properties in the
+   * Accumulo configuration of the form:
+   *
+   * <pre>
+   * instance.volume.config.&lt;volume-uri&gt;.&lt;hdfs-property&gt;
+   * </pre>
+   *
+   * We will use these properties to return a new Configuration object that can be used with the
+   * FileSystem URI.
+   *
+   * @param conf AccumuloConfiguration object
+   * @param hadoopConf Hadoop Configuration object
+   * @param filesystemURI Volume Filesystem URI
+   * @return Hadoop Configuration with custom overrides for this FileSystem
+   */
+  private static Configuration getVolumeManagerConfiguration(AccumuloConfiguration conf,
+      final Configuration hadoopConf, final String filesystemURI) {
+
+    final Configuration volumeConfig = new Configuration(hadoopConf);
+
+    conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX).entrySet()
+        .stream().filter(e -> e.getKey().startsWith(filesystemURI + ".")).forEach(e -> {
+          String key = e.getKey().substring(filesystemURI.length() + 1);
+          String value = e.getValue();
+          log.info("Overriding property {} for volume {}", key, value, filesystemURI);
+          volumeConfig.set(key, value);
+        });
+
+    return volumeConfig;
+  }
+
+  protected static Stream<Entry<String,String>>
+      findVolumeOverridesMissingVolume(AccumuloConfiguration conf, Set<String> definedVolumes) {
+    return conf.getAllPropertiesWithPrefixStripped(Property.INSTANCE_VOLUME_CONFIG_PREFIX)
+        .entrySet().stream()
+        // log only configs where none of the volumes (with a dot) prefix its key
+        .filter(e -> definedVolumes.stream().noneMatch(vol -> e.getKey().startsWith(vol + ".")));
+  }
+
   public static VolumeManager get(AccumuloConfiguration conf, final Configuration hadoopConf)
       throws IOException {
     final Map<String,Volume> volumes = new HashMap<>();
 
     Set<String> volumeStrings = VolumeConfiguration.getVolumeUris(conf);
 
+    findVolumeOverridesMissingVolume(conf, volumeStrings).forEach(
+        e -> log.warn("Found no matching volume for volume config override property {}", e));
+
     // The "default" Volume for Accumulo (in case no volumes are specified)
     for (String volumeUriOrDir : volumeStrings) {
       if (volumeUriOrDir.isBlank()) {
@@ -371,7 +425,9 @@ public class VolumeManagerImpl implements VolumeManager {
 
       // We require a URI here, fail if it doesn't look like one
       if (volumeUriOrDir.contains(":")) {
-        volumes.put(volumeUriOrDir, new VolumeImpl(new Path(volumeUriOrDir), hadoopConf));
+        Configuration volumeConfig =
+            getVolumeManagerConfiguration(conf, hadoopConf, volumeUriOrDir);
+        volumes.put(volumeUriOrDir, new VolumeImpl(new Path(volumeUriOrDir), volumeConfig));
       } else {
         throw new IllegalArgumentException("Expected fully qualified URI for "
             + Property.INSTANCE_VOLUMES.getKey() + " got " + volumeUriOrDir);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
index 14ad8deea0..a112a3c906 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/VolumeManagerImplTest.java
@@ -18,12 +18,23 @@
  */
 package org.apache.accumulo.server.fs;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.Property;
@@ -31,7 +42,10 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.spi.fs.VolumeChooser;
 import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
+import org.apache.accumulo.core.volume.Volume;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 
@@ -104,4 +118,127 @@ public class VolumeManagerImplTest {
       assertThrows(RuntimeException.class, () -> vm.choose(chooserEnv, volumes));
     }
   }
+
+  @Test
+  public void testFindOverridesWithoutVolumes() throws Exception {
+    final String vol1 = "file://127.0.0.1/vol1/";
+    final String vol2 = "file://localhost/vol2/";
+    ConfigurationCopy conf = new ConfigurationCopy();
+    conf.set(Property.INSTANCE_VOLUMES, String.join(",", vol1));
+    conf.set(Property.GENERAL_VOLUME_CHOOSER, Property.GENERAL_VOLUME_CHOOSER.getDefaultValue());
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1 + "." + THREADPOOL_SIZE_KEY,
+        "10");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2 + "." + THREADPOOL_SIZE_KEY,
+        "20");
+
+    List<Entry<String,String>> properties = VolumeManagerImpl
+        .findVolumeOverridesMissingVolume(conf, Set.of(vol1)).collect(Collectors.toList());
+
+    assertNotNull(properties);
+    assertEquals(1, properties.size());
+    System.out.println(properties.toString());
+    Entry<String,String> e = properties.get(0);
+    assertEquals(vol2 + "." + THREADPOOL_SIZE_KEY, e.getKey());
+    assertEquals("20", e.getValue());
+  }
+
+  @Test
+  public void testConfigurationOverrides() throws Exception {
+
+    final String vol1 = "file://127.0.0.1/vol1/";
+    final String vol2 = "file://localhost/vol2/";
+    final String vol3 = "hdfs://127.0.0.1/accumulo";
+    final String vol4 = "hdfs://localhost/accumulo";
+    final String vol5 = "hdfs://localhost:8020/vol3";
+    final String vol6 = "hdfs://localhost:8020/vol4";
+
+    ConfigurationCopy conf = new ConfigurationCopy();
+    conf.set(Property.INSTANCE_VOLUMES, String.join(",", vol1, vol2, vol3, vol4));
+    conf.set(Property.GENERAL_VOLUME_CHOOSER, Property.GENERAL_VOLUME_CHOOSER.getDefaultValue());
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1 + "." + THREADPOOL_SIZE_KEY,
+        "10");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1 + "."
+        + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "true");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1 + "." + DFS_BLOCK_SIZE_KEY,
+        "268435456");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2 + "." + THREADPOOL_SIZE_KEY,
+        "20");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2 + "."
+        + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "false");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol3 + "." + THREADPOOL_SIZE_KEY,
+        "30");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol3 + "."
+        + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "TRUE");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol4 + "." + THREADPOOL_SIZE_KEY,
+        "40");
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol4 + "."
+        + DFS_CLIENT_CACHE_DROP_BEHIND_READS, "FALSE");
+    // Setting this property should result in a warning in the log because there is no matching
+    // vol6 in instance.volumes. There is no warning for vol5 because there is no override for
+    // vol5.
+    conf.set(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol6 + "." + DFS_REPLICATION_KEY,
+        "45");
+
+    VolumeManager vm = VolumeManagerImpl.get(conf, hadoopConf);
+
+    FileSystem fs1 = vm.getFileSystemByPath(new Path(vol1));
+    Configuration conf1 = fs1.getConf();
+
+    FileSystem fs2 = vm.getFileSystemByPath(new Path(vol2));
+    Configuration conf2 = fs2.getConf();
+
+    FileSystem fs3 = vm.getFileSystemByPath(new Path(vol3));
+    Configuration conf3 = fs3.getConf();
+
+    FileSystem fs4 = vm.getFileSystemByPath(new Path(vol4));
+    Configuration conf4 = fs4.getConf();
+
+    FileSystem fs5 = vm.getFileSystemByPath(new Path(vol5));
+    Configuration conf5 = fs5.getConf();
+
+    assertEquals("10", conf1.get(THREADPOOL_SIZE_KEY));
+    assertEquals("true", conf1.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    assertEquals("268435456", conf1.get(DFS_BLOCK_SIZE_KEY));
+
+    assertEquals("20", conf2.get(THREADPOOL_SIZE_KEY));
+    assertEquals("false", conf2.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    assertNull(conf2.get(DFS_BLOCK_SIZE_KEY));
+
+    assertEquals("30", conf3.get(THREADPOOL_SIZE_KEY));
+    assertEquals("TRUE", conf3.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    assertNull(conf3.get(DFS_BLOCK_SIZE_KEY));
+
+    assertEquals("40", conf4.get(THREADPOOL_SIZE_KEY));
+    assertEquals("FALSE", conf4.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    assertNull(conf4.get(DFS_BLOCK_SIZE_KEY));
+
+    assertNull(conf5.get(THREADPOOL_SIZE_KEY));
+    assertNull(conf5.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+    assertNull(conf5.get(DFS_BLOCK_SIZE_KEY));
+
+    Collection<Volume> vols = vm.getVolumes();
+    assertEquals(4, vols.size());
+    vols.forEach(v -> {
+      if (v.containsPath(new Path(vol1))) {
+        assertEquals("10", v.getFileSystem().getConf().get(THREADPOOL_SIZE_KEY));
+        assertEquals("true", v.getFileSystem().getConf().get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+        assertEquals("268435456", v.getFileSystem().getConf().get(DFS_BLOCK_SIZE_KEY));
+      } else if (v.containsPath(new Path(vol2))) {
+        assertEquals("20", v.getFileSystem().getConf().get(THREADPOOL_SIZE_KEY));
+        assertEquals("false", v.getFileSystem().getConf().get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+        assertNull(v.getFileSystem().getConf().get(DFS_BLOCK_SIZE_KEY));
+      } else if (v.containsPath(new Path(vol3))) {
+        assertEquals("30", v.getFileSystem().getConf().get(THREADPOOL_SIZE_KEY));
+        assertEquals("TRUE", v.getFileSystem().getConf().get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+        assertNull(v.getFileSystem().getConf().get(DFS_BLOCK_SIZE_KEY));
+      } else if (v.containsPath(new Path(vol4))) {
+        assertEquals("40", v.getFileSystem().getConf().get(THREADPOOL_SIZE_KEY));
+        assertEquals("FALSE", v.getFileSystem().getConf().get(DFS_CLIENT_CACHE_DROP_BEHIND_READS));
+        assertNull(v.getFileSystem().getConf().get(DFS_BLOCK_SIZE_KEY));
+      } else {
+        fail("Unhandled volume: " + v);
+      }
+    });
+
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeManagerIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeManagerIT.java
new file mode 100644
index 0000000000..4a3002bd07
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeManagerIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.jupiter.api.Test;
+
+public class VolumeManagerIT extends ConfigurableMacBase {
+
+  private String vol1;
+  private String vol2;
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.useMiniDFS(true);
+    cfg.setPreStartConfigProcessor((config) -> {
+      vol1 = config.getSiteConfig().get(Property.INSTANCE_VOLUMES.getKey());
+      assertTrue(vol1.contains("localhost"));
+      vol2 = vol1.replace("localhost", "127.0.0.1");
+      config.setProperty(Property.INSTANCE_VOLUMES.getKey(), String.join(",", vol2, vol1));
+
+      // Set Volume specific HDFS overrides
+      config.setProperty("general.volume.chooser",
+          "org.apache.accumulo.core.spi.fs.PreferredVolumeChooser");
+      config.setProperty("general.custom.volume.preferred.default", vol1);
+      config.setProperty("general.custom.volume.preferred.logger", vol2);
+
+      // Need to escape the colons in the custom property volume because it's part of the key. It's
+      // being written out to a file and read in using the Properties object.
+      String vol1FileOutput = vol1.replaceAll(":", "\\\\:");
+      String vol2FileOutput = vol2.replaceAll(":", "\\\\:");
+      config.setProperty(
+          Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1FileOutput + ".dfs.blocksize",
+          "10485760");
+      config.setProperty(
+          Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2FileOutput + ".dfs.blocksize",
+          "51200000");
+      config.setProperty(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1FileOutput
+          + ".dfs.client.use.datanode.hostname", "true");
+      config.setProperty(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2FileOutput
+          + ".dfs.client.use.datanode.hostname", "false");
+      config.setProperty(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1FileOutput
+          + ".dfs.client.hedged.read.threadpool.size", "0");
+      config.setProperty(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2FileOutput
+          + ".dfs.client.hedged.read.threadpool.size", "1");
+    });
+  }
+
+  @Test
+  public void testHdfsConfigOverrides() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
+
+      Map<String,String> siteConfig = c.instanceOperations().getSiteConfiguration();
+      assertEquals("10485760", siteConfig
+          .get(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1 + ".dfs.blocksize"));
+      assertEquals("51200000", siteConfig
+          .get(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2 + ".dfs.blocksize"));
+      assertEquals("true", siteConfig.get(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1
+          + ".dfs.client.use.datanode.hostname"));
+      assertEquals("false", siteConfig.get(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2
+          + ".dfs.client.use.datanode.hostname"));
+      assertEquals("0", siteConfig.get(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol1
+          + ".dfs.client.hedged.read.threadpool.size"));
+      assertEquals("1", siteConfig.get(Property.INSTANCE_VOLUME_CONFIG_PREFIX.getKey() + vol2
+          + ".dfs.client.hedged.read.threadpool.size"));
+
+      String[] names = getUniqueNames(2);
+      String t1 = names[0];
+      String t2 = names[1];
+
+      NewTableConfiguration ntc1 = new NewTableConfiguration();
+      ntc1.setProperties(Map.of("table.custom.volume.preferred", vol1));
+      c.tableOperations().create(t1, ntc1);
+      ReadWriteIT.ingest(c, 10, 10, 100, 0, t1);
+      c.tableOperations().flush(t1, null, null, true);
+
+      NewTableConfiguration ntc2 = new NewTableConfiguration();
+      ntc2.setProperties(Map.of("table.custom.volume.preferred", vol2));
+      c.tableOperations().create(t2, ntc2);
+      ReadWriteIT.ingest(c, 10, 10, 100, 0, t2);
+      c.tableOperations().flush(t2, null, null, true);
+
+      String tid1 = c.tableOperations().tableIdMap().get(t1);
+      String tid2 = c.tableOperations().tableIdMap().get(t2);
+
+      assertNotNull(tid1);
+      assertNotNull(tid2);
+
+      // Confirm that table 1 has a block size of 10485760
+      FileSystem fs = this.cluster.getMiniDfs().getFileSystem();
+      RemoteIterator<LocatedFileStatus> iter1 =
+          fs.listFiles(new Path("/accumulo/tables/" + tid1), true);
+      while (iter1.hasNext()) {
+        LocatedFileStatus stat = iter1.next();
+        if (stat.isFile()) {
+          assertEquals(10485760, stat.getBlockSize());
+        }
+      }
+
+      // Confirm that table 1 has a block size of 51200000
+      RemoteIterator<LocatedFileStatus> iter2 =
+          fs.listFiles(new Path("/accumulo/tables/" + tid2), true);
+      while (iter2.hasNext()) {
+        LocatedFileStatus stat = iter2.next();
+        if (stat.isFile()) {
+          assertEquals(51200000, stat.getBlockSize());
+        }
+      }
+
+    }
+
+  }
+
+}