You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/10/02 14:49:46 UTC

[accumulo] branch master updated: Add space aware volume chooser (#645)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b50114  Add space aware volume chooser (#645)
5b50114 is described below

commit 5b5011410145b4a68365f42a566983bc1834877b
Author: alerman <al...@gmail.com>
AuthorDate: Tue Oct 2 10:49:38 2018 -0400

    Add space aware volume chooser (#645)
---
 .../server/fs/SpaceAwareVolumeChooser.java         | 143 +++++++++++++
 .../server/fs/SpaceAwareVolumeChooserTest.java     | 222 +++++++++++++++++++++
 2 files changed, 365 insertions(+)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
new file mode 100644
index 0000000..2982459
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * A {@link PreferredVolumeChooser} that takes remaining HDFS space into account when making a
+ * volume choice rather than a simpler round robin. The list of volumes to use can be limited using
+ * the same properties as {@link PreferredVolumeChooser}
+ */
+public class SpaceAwareVolumeChooser extends PreferredVolumeChooser {
+
+  public static final String HDFS_SPACE_RECOMPUTE_INTERVAL = Property.GENERAL_ARBITRARY_PROP_PREFIX
+      .getKey() + "spaceaware.volume.chooser.recompute.interval";
+
+  // Default time to wait in ms. Defaults to 5 min
+  private long defaultComputationCacheDuration = 300000;
+  LoadingCache<List<String>,WeightedRandomCollection> choiceCache = null;
+
+  private static final Logger log = LoggerFactory.getLogger(SpaceAwareVolumeChooser.class);
+
+  @Override
+  public String choose(VolumeChooserEnvironment env, String[] options)
+      throws VolumeChooserException {
+
+    options = getPreferredVolumes(env, options);
+
+    try {
+      return getCache(env).get(Arrays.asList(options)).next();
+    } catch (ExecutionException e) {
+      throw new IllegalStateException("Execution exception when attempting to cache choice", e);
+    }
+  }
+
+  private synchronized LoadingCache<List<String>,WeightedRandomCollection> getCache(
+      VolumeChooserEnvironment env) {
+
+    if (choiceCache == null) {
+      ServerConfigurationFactory scf = loadConfFactory(env);
+      AccumuloConfiguration systemConfiguration = scf.getSystemConfiguration();
+      String propertyValue = systemConfiguration.get(HDFS_SPACE_RECOMPUTE_INTERVAL);
+
+      long computationCacheDuration = StringUtils.isNotBlank(propertyValue)
+          ? Long.parseLong(propertyValue)
+          : defaultComputationCacheDuration;
+
+      choiceCache = CacheBuilder.newBuilder()
+          .expireAfterWrite(computationCacheDuration, TimeUnit.MILLISECONDS)
+          .build(new CacheLoader<List<String>,WeightedRandomCollection>() {
+            public WeightedRandomCollection load(List<String> key) {
+              return new WeightedRandomCollection(key, env);
+            }
+          });
+    }
+
+    return choiceCache;
+  }
+
+  public class WeightedRandomCollection {
+    private final NavigableMap<Double,String> map = new TreeMap<Double,String>();
+    private final Random random;
+    private double total = 0;
+
+    public WeightedRandomCollection(List<String> options, VolumeChooserEnvironment env) {
+      this.random = new Random();
+
+      if (options.size() < 1) {
+        throw new IllegalStateException("Options was empty! No valid volumes to choose from.");
+      }
+
+      VolumeManager manager = env.getServerContext().getVolumeManager();
+
+      // Compute percentage space available on each volume
+      for (String option : options) {
+        FileSystem pathFs = manager.getVolumeByPath(new Path(option)).getFileSystem();
+        try {
+          FsStatus optionStatus = pathFs.getStatus();
+          double percentFree = ((double) optionStatus.getRemaining() / optionStatus.getCapacity());
+          add(percentFree, option);
+        } catch (IOException e) {
+          log.error("Unable to get file system status for" + option, e);
+        }
+      }
+
+      if (map.size() < 1) {
+        throw new IllegalStateException(
+            "Weighted options was empty! Could indicate an issue getting file system status or "
+                + "no free space on any volume");
+      }
+    }
+
+    public WeightedRandomCollection add(double weight, String result) {
+      if (weight <= 0) {
+        log.info("Weight was 0. Not adding " + result);
+        return this;
+      }
+      total += weight;
+      map.put(total, result);
+      return this;
+    }
+
+    public String next() {
+      double value = random.nextDouble() * total;
+      return map.higherEntry(value).getValue();
+    }
+  }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
new file mode 100644
index 0000000..1b4f032
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+public class SpaceAwareVolumeChooserTest {
+  VolumeManager volumeManager = null;
+  VolumeChooserEnvironment chooserEnv = null;
+  ServerContext serverContext = null;
+  ServerConfigurationFactory serverConfigurationFactory = null;
+  AccumuloConfiguration sysConfig = null;
+  Volume vol1 = null;
+  Volume vol2 = null;
+  FileSystem fs1 = null;
+  FileSystem fs2 = null;
+  FsStatus status1 = null;
+  FsStatus status2 = null;
+
+  int iterations = 1000;
+
+  String volumeOne = "hdfs://nn1:8020/apps/accumulo1/tables";
+  String volumeTwo = "hdfs://nn2:8020/applications/accumulo/tables";
+
+  // Different volumes with different paths
+  String[] tableDirs = {volumeOne, volumeTwo};
+
+  int vol1Count = 0;
+  int vol2Count = 0;
+
+  @Before
+  public void beforeTest() {
+    volumeManager = EasyMock.createMock(VolumeManager.class);
+    serverContext = EasyMock.createMock(ServerContext.class);
+    serverConfigurationFactory = EasyMock.createMock(ServerConfigurationFactory.class);
+    sysConfig = EasyMock.createMock(AccumuloConfiguration.class);
+    vol1 = EasyMock.createMock(Volume.class);
+    vol2 = EasyMock.createMock(Volume.class);
+    fs1 = EasyMock.createMock(FileSystem.class);
+    fs2 = EasyMock.createMock(FileSystem.class);
+    status1 = EasyMock.createMock(FsStatus.class);
+    status2 = EasyMock.createMock(FsStatus.class);
+    chooserEnv = new VolumeChooserEnvironment(VolumeChooserEnvironment.ChooserScope.DEFAULT,
+        serverContext);
+
+  }
+
+  private void testSpecificSetup(long percentage1, long percentage2, String cacheDuration,
+      int timesToCallPreferredVolumeChooser, boolean anyTimes) throws IOException {
+    int max = iterations + 1;
+    int min = 1;
+    int updatePropertyMax = timesToCallPreferredVolumeChooser + iterations;
+    if (anyTimes) {
+      max = iterations + 1;
+      updatePropertyMax = max + 1;
+    }
+    // Volume 1 is percentage1 full
+    EasyMock.expect(status1.getRemaining()).andReturn(percentage1).times(min, max);
+    EasyMock.expect(status1.getCapacity()).andReturn(100L).times(min, max);
+
+    // Volume 2 is percentage2 full
+    EasyMock.expect(status2.getRemaining()).andReturn(percentage2).times(min, max);
+    EasyMock.expect(status2.getCapacity()).andReturn(100L).times(min, max);
+
+    EasyMock.expect(sysConfig.get(SpaceAwareVolumeChooser.HDFS_SPACE_RECOMPUTE_INTERVAL))
+        .andReturn(cacheDuration).times(1);
+    EasyMock
+        .expect(sysConfig.get(PreferredVolumeChooser
+            .getPropertyNameForScope(VolumeChooserEnvironment.ChooserScope.DEFAULT)))
+        .andReturn(String.join(",", tableDirs)).times(timesToCallPreferredVolumeChooser);
+
+    EasyMock.expect(serverContext.getVolumeManager()).andReturn(volumeManager).times(min,
+        Math.max(max, updatePropertyMax));
+    EasyMock.expect(serverContext.getServerConfFactory()).andReturn(serverConfigurationFactory)
+        .times(min, updatePropertyMax);
+    EasyMock.expect(serverConfigurationFactory.getSystemConfiguration()).andReturn(sysConfig)
+        .times(1, updatePropertyMax);
+
+    EasyMock.expect(volumeManager.getVolumeByPath(new Path(volumeOne))).andReturn(vol1).times(min,
+        max);
+    EasyMock.expect(volumeManager.getVolumeByPath(new Path(volumeTwo))).andReturn(vol2).times(min,
+        max);
+    EasyMock.expect(vol1.getFileSystem()).andReturn(fs1).times(min, max);
+    EasyMock.expect(vol2.getFileSystem()).andReturn(fs2).times(min, max);
+    EasyMock.expect(fs1.getStatus()).andReturn(status1).times(min, max);
+    EasyMock.expect(fs2.getStatus()).andReturn(status2).times(min, max);
+
+    EasyMock.replay(serverContext, vol1, vol2, fs1, fs2, status1, status2, volumeManager,
+        serverConfigurationFactory, sysConfig);
+  }
+
+  @After
+  public void afterTest() {
+
+    EasyMock.verify(serverContext, vol1, vol2, fs1, fs2, status1, status2, volumeManager,
+        serverConfigurationFactory, sysConfig);
+
+    volumeManager = null;
+    serverContext = null;
+    vol1 = null;
+    vol2 = null;
+    fs1 = null;
+    fs2 = null;
+    status1 = null;
+    status2 = null;
+    vol1Count = 0;
+    vol2Count = 0;
+  }
+
+  @Test
+  public void testEvenWeightsWithCaching() throws IOException {
+
+    testSpecificSetup(10L, 10L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations / 2, vol1Count, iterations / 10);
+    assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testEvenWeightsNoCaching() throws IOException {
+
+    testSpecificSetup(10L, 10L, "0", iterations, true);
+
+    makeChoices();
+
+    assertEquals(iterations / 2, vol1Count, iterations / 10);
+    assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+  }
+
+  @Test(expected = UncheckedExecutionException.class)
+  public void testNoFreeSpace() throws IOException {
+
+    testSpecificSetup(0L, 0L, null, 1, false);
+
+    makeChoices();
+  }
+
+  @Test
+  public void testNinetyTen() throws IOException {
+
+    testSpecificSetup(90L, 10L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations * .9, vol1Count, iterations / 10);
+    assertEquals(iterations * .1, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testTenNinety() throws IOException {
+
+    testSpecificSetup(10L, 90L, null, iterations, false);
+
+    makeChoices();
+
+    assertEquals(iterations * .1, vol1Count, iterations / 10);
+    assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+  }
+
+  @Test
+  public void testWithNoCaching() throws IOException {
+
+    testSpecificSetup(10L, 90L, "0", iterations, true);
+
+    makeChoices();
+
+    assertEquals(iterations * .1, vol1Count, iterations / 10);
+    assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+  }
+
+  private void makeChoices() {
+    SpaceAwareVolumeChooser chooser = new SpaceAwareVolumeChooser();
+    for (int i = 0; i < iterations; i++) {
+      String choice = chooser.choose(chooserEnv, tableDirs);
+      if (choice.equals(volumeOne)) {
+        vol1Count += 1;
+      }
+
+      if (choice.equals(volumeTwo)) {
+        vol2Count += 1;
+      }
+    }
+
+  }
+}