You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/10/02 14:49:46 UTC
[accumulo] branch master updated: Add space aware volume chooser
(#645)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 5b50114 Add space aware volume chooser (#645)
5b50114 is described below
commit 5b5011410145b4a68365f42a566983bc1834877b
Author: alerman <al...@gmail.com>
AuthorDate: Tue Oct 2 10:49:38 2018 -0400
Add space aware volume chooser (#645)
---
.../server/fs/SpaceAwareVolumeChooser.java | 143 +++++++++++++
.../server/fs/SpaceAwareVolumeChooserTest.java | 222 +++++++++++++++++++++
2 files changed, 365 insertions(+)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
new file mode 100644
index 0000000..2982459
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooser.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+/**
+ * A {@link PreferredVolumeChooser} that takes remaining HDFS space into account when making a
+ * volume choice rather than a simpler round robin. The list of volumes to use can be limited using
+ * the same properties as {@link PreferredVolumeChooser}
+ */
+public class SpaceAwareVolumeChooser extends PreferredVolumeChooser {
+
+ public static final String HDFS_SPACE_RECOMPUTE_INTERVAL = Property.GENERAL_ARBITRARY_PROP_PREFIX
+ .getKey() + "spaceaware.volume.chooser.recompute.interval";
+
+ // Default time to wait in ms. Defaults to 5 min
+ private long defaultComputationCacheDuration = 300000;
+ LoadingCache<List<String>,WeightedRandomCollection> choiceCache = null;
+
+ private static final Logger log = LoggerFactory.getLogger(SpaceAwareVolumeChooser.class);
+
+ @Override
+ public String choose(VolumeChooserEnvironment env, String[] options)
+ throws VolumeChooserException {
+
+ options = getPreferredVolumes(env, options);
+
+ try {
+ return getCache(env).get(Arrays.asList(options)).next();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Execution exception when attempting to cache choice", e);
+ }
+ }
+
+ private synchronized LoadingCache<List<String>,WeightedRandomCollection> getCache(
+ VolumeChooserEnvironment env) {
+
+ if (choiceCache == null) {
+ ServerConfigurationFactory scf = loadConfFactory(env);
+ AccumuloConfiguration systemConfiguration = scf.getSystemConfiguration();
+ String propertyValue = systemConfiguration.get(HDFS_SPACE_RECOMPUTE_INTERVAL);
+
+ long computationCacheDuration = StringUtils.isNotBlank(propertyValue)
+ ? Long.parseLong(propertyValue)
+ : defaultComputationCacheDuration;
+
+ choiceCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(computationCacheDuration, TimeUnit.MILLISECONDS)
+ .build(new CacheLoader<List<String>,WeightedRandomCollection>() {
+ public WeightedRandomCollection load(List<String> key) {
+ return new WeightedRandomCollection(key, env);
+ }
+ });
+ }
+
+ return choiceCache;
+ }
+
+ public class WeightedRandomCollection {
+ private final NavigableMap<Double,String> map = new TreeMap<Double,String>();
+ private final Random random;
+ private double total = 0;
+
+ public WeightedRandomCollection(List<String> options, VolumeChooserEnvironment env) {
+ this.random = new Random();
+
+ if (options.size() < 1) {
+ throw new IllegalStateException("Options was empty! No valid volumes to choose from.");
+ }
+
+ VolumeManager manager = env.getServerContext().getVolumeManager();
+
+ // Compute percentage space available on each volume
+ for (String option : options) {
+ FileSystem pathFs = manager.getVolumeByPath(new Path(option)).getFileSystem();
+ try {
+ FsStatus optionStatus = pathFs.getStatus();
+ double percentFree = ((double) optionStatus.getRemaining() / optionStatus.getCapacity());
+ add(percentFree, option);
+ } catch (IOException e) {
+ log.error("Unable to get file system status for" + option, e);
+ }
+ }
+
+ if (map.size() < 1) {
+ throw new IllegalStateException(
+ "Weighted options was empty! Could indicate an issue getting file system status or "
+ + "no free space on any volume");
+ }
+ }
+
+ public WeightedRandomCollection add(double weight, String result) {
+ if (weight <= 0) {
+ log.info("Weight was 0. Not adding " + result);
+ return this;
+ }
+ total += weight;
+ map.put(total, result);
+ return this;
+ }
+
+ public String next() {
+ double value = random.nextDouble() * total;
+ return map.higherEntry(value).getValue();
+ }
+ }
+}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
new file mode 100644
index 0000000..1b4f032
--- /dev/null
+++ b/server/base/src/test/java/org/apache/accumulo/server/fs/SpaceAwareVolumeChooserTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.fs;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.Path;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+public class SpaceAwareVolumeChooserTest {
+ VolumeManager volumeManager = null;
+ VolumeChooserEnvironment chooserEnv = null;
+ ServerContext serverContext = null;
+ ServerConfigurationFactory serverConfigurationFactory = null;
+ AccumuloConfiguration sysConfig = null;
+ Volume vol1 = null;
+ Volume vol2 = null;
+ FileSystem fs1 = null;
+ FileSystem fs2 = null;
+ FsStatus status1 = null;
+ FsStatus status2 = null;
+
+ int iterations = 1000;
+
+ String volumeOne = "hdfs://nn1:8020/apps/accumulo1/tables";
+ String volumeTwo = "hdfs://nn2:8020/applications/accumulo/tables";
+
+ // Different volumes with different paths
+ String[] tableDirs = {volumeOne, volumeTwo};
+
+ int vol1Count = 0;
+ int vol2Count = 0;
+
+ @Before
+ public void beforeTest() {
+ volumeManager = EasyMock.createMock(VolumeManager.class);
+ serverContext = EasyMock.createMock(ServerContext.class);
+ serverConfigurationFactory = EasyMock.createMock(ServerConfigurationFactory.class);
+ sysConfig = EasyMock.createMock(AccumuloConfiguration.class);
+ vol1 = EasyMock.createMock(Volume.class);
+ vol2 = EasyMock.createMock(Volume.class);
+ fs1 = EasyMock.createMock(FileSystem.class);
+ fs2 = EasyMock.createMock(FileSystem.class);
+ status1 = EasyMock.createMock(FsStatus.class);
+ status2 = EasyMock.createMock(FsStatus.class);
+ chooserEnv = new VolumeChooserEnvironment(VolumeChooserEnvironment.ChooserScope.DEFAULT,
+ serverContext);
+
+ }
+
+ private void testSpecificSetup(long percentage1, long percentage2, String cacheDuration,
+ int timesToCallPreferredVolumeChooser, boolean anyTimes) throws IOException {
+ int max = iterations + 1;
+ int min = 1;
+ int updatePropertyMax = timesToCallPreferredVolumeChooser + iterations;
+ if (anyTimes) {
+ max = iterations + 1;
+ updatePropertyMax = max + 1;
+ }
+ // Volume 1 is percentage1 full
+ EasyMock.expect(status1.getRemaining()).andReturn(percentage1).times(min, max);
+ EasyMock.expect(status1.getCapacity()).andReturn(100L).times(min, max);
+
+ // Volume 2 is percentage2 full
+ EasyMock.expect(status2.getRemaining()).andReturn(percentage2).times(min, max);
+ EasyMock.expect(status2.getCapacity()).andReturn(100L).times(min, max);
+
+ EasyMock.expect(sysConfig.get(SpaceAwareVolumeChooser.HDFS_SPACE_RECOMPUTE_INTERVAL))
+ .andReturn(cacheDuration).times(1);
+ EasyMock
+ .expect(sysConfig.get(PreferredVolumeChooser
+ .getPropertyNameForScope(VolumeChooserEnvironment.ChooserScope.DEFAULT)))
+ .andReturn(String.join(",", tableDirs)).times(timesToCallPreferredVolumeChooser);
+
+ EasyMock.expect(serverContext.getVolumeManager()).andReturn(volumeManager).times(min,
+ Math.max(max, updatePropertyMax));
+ EasyMock.expect(serverContext.getServerConfFactory()).andReturn(serverConfigurationFactory)
+ .times(min, updatePropertyMax);
+ EasyMock.expect(serverConfigurationFactory.getSystemConfiguration()).andReturn(sysConfig)
+ .times(1, updatePropertyMax);
+
+ EasyMock.expect(volumeManager.getVolumeByPath(new Path(volumeOne))).andReturn(vol1).times(min,
+ max);
+ EasyMock.expect(volumeManager.getVolumeByPath(new Path(volumeTwo))).andReturn(vol2).times(min,
+ max);
+ EasyMock.expect(vol1.getFileSystem()).andReturn(fs1).times(min, max);
+ EasyMock.expect(vol2.getFileSystem()).andReturn(fs2).times(min, max);
+ EasyMock.expect(fs1.getStatus()).andReturn(status1).times(min, max);
+ EasyMock.expect(fs2.getStatus()).andReturn(status2).times(min, max);
+
+ EasyMock.replay(serverContext, vol1, vol2, fs1, fs2, status1, status2, volumeManager,
+ serverConfigurationFactory, sysConfig);
+ }
+
+ @After
+ public void afterTest() {
+
+ EasyMock.verify(serverContext, vol1, vol2, fs1, fs2, status1, status2, volumeManager,
+ serverConfigurationFactory, sysConfig);
+
+ volumeManager = null;
+ serverContext = null;
+ vol1 = null;
+ vol2 = null;
+ fs1 = null;
+ fs2 = null;
+ status1 = null;
+ status2 = null;
+ vol1Count = 0;
+ vol2Count = 0;
+ }
+
+ @Test
+ public void testEvenWeightsWithCaching() throws IOException {
+
+ testSpecificSetup(10L, 10L, null, iterations, false);
+
+ makeChoices();
+
+ assertEquals(iterations / 2, vol1Count, iterations / 10);
+ assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+ }
+
+ @Test
+ public void testEvenWeightsNoCaching() throws IOException {
+
+ testSpecificSetup(10L, 10L, "0", iterations, true);
+
+ makeChoices();
+
+ assertEquals(iterations / 2, vol1Count, iterations / 10);
+ assertEquals(iterations / 2, vol2Count, iterations / 10);
+
+ }
+
+ @Test(expected = UncheckedExecutionException.class)
+ public void testNoFreeSpace() throws IOException {
+
+ testSpecificSetup(0L, 0L, null, 1, false);
+
+ makeChoices();
+ }
+
+ @Test
+ public void testNinetyTen() throws IOException {
+
+ testSpecificSetup(90L, 10L, null, iterations, false);
+
+ makeChoices();
+
+ assertEquals(iterations * .9, vol1Count, iterations / 10);
+ assertEquals(iterations * .1, vol2Count, iterations / 10);
+
+ }
+
+ @Test
+ public void testTenNinety() throws IOException {
+
+ testSpecificSetup(10L, 90L, null, iterations, false);
+
+ makeChoices();
+
+ assertEquals(iterations * .1, vol1Count, iterations / 10);
+ assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+ }
+
+ @Test
+ public void testWithNoCaching() throws IOException {
+
+ testSpecificSetup(10L, 90L, "0", iterations, true);
+
+ makeChoices();
+
+ assertEquals(iterations * .1, vol1Count, iterations / 10);
+ assertEquals(iterations * .9, vol2Count, iterations / 10);
+
+ }
+
+ private void makeChoices() {
+ SpaceAwareVolumeChooser chooser = new SpaceAwareVolumeChooser();
+ for (int i = 0; i < iterations; i++) {
+ String choice = chooser.choose(chooserEnv, tableDirs);
+ if (choice.equals(volumeOne)) {
+ vol1Count += 1;
+ }
+
+ if (choice.equals(volumeTwo)) {
+ vol2Count += 1;
+ }
+ }
+
+ }
+}