You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/11/20 00:31:48 UTC
[2/3] cassandra git commit: improve JBOD disk utilization
improve JBOD disk utilization
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2291a60e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2291a60e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2291a60e
Branch: refs/heads/trunk
Commit: 2291a60e9eded4486528acc0a8d12a062b21fc26
Parents: 4397c34
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Nov 19 16:17:05 2014 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 19 17:17:40 2014 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 5 -
.../org/apache/cassandra/db/Directories.java | 161 ++++++++++++-------
.../db/compaction/CompactionManager.java | 12 +-
.../cassandra/db/compaction/Scrubber.java | 5 +-
.../cassandra/io/util/DiskAwareRunnable.java | 14 +-
.../cassandra/service/StorageService.java | 20 ---
.../cassandra/streaming/StreamReader.java | 3 +-
.../cassandra/streaming/StreamReceiveTask.java | 8 +-
.../apache/cassandra/db/DirectoriesTest.java | 128 +++++++++++++++
10 files changed, 252 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 41a5aaf..e008ab9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
* Fix overflow on histogram computation (CASSANDRA-8028)
* Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
* Fix incremental repair not remove parent session on remote (CASSANDRA-8291)
+ * Improve JBOD disk utilization (CASSANDRA-7386)
Merged from 2.0:
* Fix some failing queries that use multi-column relations
on COMPACT STORAGE tables (CASSANDRA-8264)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 7e1dd18..dec5370 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2240,11 +2240,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return directories.getSnapshotDetails();
}
- public boolean hasUnreclaimedSpace()
- {
- return getLiveDiskSpaceUsed() < getTotalDiskSpaceUsed();
- }
-
public long getTotalDiskSpaceUsed()
{
return metric.totalDiskSpaceUsed.count();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 4319481..eb33bd8 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -29,8 +29,7 @@ import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
@@ -39,8 +38,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -96,7 +93,6 @@ public class Directories
dataDirectories[i] = new DataDirectory(new File(locations[i]));
}
-
/**
* Checks whether Cassandra has RWX permissions to the specified directory. Logs an error with
* the details if it does not.
@@ -198,7 +194,7 @@ public class Directories
for (int i = 0; i < dataDirectories.length; ++i)
{
// check if old SSTable directory exists
- dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, this.metadata.cfName));
+ dataPaths[i] = new File(dataDirectories[i].location, join(metadata.ksName, metadata.cfName));
}
boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
{
@@ -237,11 +233,10 @@ public class Directories
*/
public File getLocationForDisk(DataDirectory dataDirectory)
{
- for (File dir : dataPaths)
- {
- if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
- return dir;
- }
+ if (dataDirectory != null)
+ for (File dir : dataPaths)
+ if (dir.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
+ return dir;
return null;
}
@@ -255,65 +250,96 @@ public class Directories
return null;
}
+ /**
+ * Basically the same as calling {@link #getWriteableLocationAsFile(long)} with an unknown size ({@code -1L}),
+ * which may return any non-blacklisted directory - even a data directory that has no usable space.
+ * Do not use this method in production code.
+ *
+ * @throws IOError if all directories are blacklisted.
+ */
public File getDirectoryForNewSSTables()
{
- File path = getWriteableLocationAsFile();
-
- // Requesting GC has a chance to free space only if we're using mmap and a non SUN jvm
- if (path == null
- && (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap || DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
- && !FileUtils.isCleanerAvailable())
- {
- logger.info("Forcing GC to free up disk space. Upgrade to the Oracle JVM to avoid this");
- StorageService.instance.requestGC();
- // retry after GCing has forced unmap of compacted SSTables so they can be deleted
- // Note: GCInspector will do this already, but only sun JVM supports GCInspector so far
- SSTableDeletingTask.rescheduleFailedTasks();
- Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
- path = getWriteableLocationAsFile();
- }
-
- return path;
+ return getWriteableLocationAsFile(-1L);
}
- public File getWriteableLocationAsFile()
+ /**
+ * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
+ *
+ * @throws IOError if all directories are blacklisted.
+ */
+ public File getWriteableLocationAsFile(long writeSize)
{
- return getLocationForDisk(getWriteableLocation());
+ return getLocationForDisk(getWriteableLocation(writeSize));
}
/**
- * @return a non-blacklisted directory with the most free space and least current tasks.
+ * Returns a non-blacklisted data directory that _currently_ has {@code writeSize} bytes as usable space.
*
* @throws IOError if all directories are blacklisted.
*/
- public DataDirectory getWriteableLocation()
+ public DataDirectory getWriteableLocation(long writeSize)
{
- List<DataDirectory> candidates = new ArrayList<>();
+ List<DataDirectoryCandidate> candidates = new ArrayList<>();
+
+ long totalAvailable = 0L;
// pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
+ boolean tooBig = false;
for (DataDirectory dataDir : dataDirectories)
{
if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
continue;
- candidates.add(dataDir);
+ DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
+ // exclude directory if its total writeSize does not fit to data directory
+ if (candidate.availableSpace < writeSize)
+ {
+ tooBig = true;
+ continue;
+ }
+ candidates.add(candidate);
+ totalAvailable += candidate.availableSpace;
}
if (candidates.isEmpty())
- throw new IOError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"));
+ if (tooBig)
+ return null;
+ else
+ throw new IOError(new IOException("All configured data directories have been blacklisted as unwritable for erroring out"));
- // sort directories by free space, in _descending_ order.
- Collections.sort(candidates);
+ // shortcut for single data directory systems
+ if (candidates.size() == 1)
+ return candidates.get(0).dataDirectory;
+
+ sortWriteableCandidates(candidates, totalAvailable);
- // sort directories by load, in _ascending_ order.
- Collections.sort(candidates, new Comparator<DataDirectory>()
+ return pickWriteableDirectory(candidates);
+ }
+
+ // separated for unit testing
+ static DataDirectory pickWriteableDirectory(List<DataDirectoryCandidate> candidates)
+ {
+ // weighted random
+ double rnd = ThreadLocalRandom.current().nextDouble();
+ for (DataDirectoryCandidate candidate : candidates)
{
- public int compare(DataDirectory a, DataDirectory b)
- {
- return a.currentTasks.get() - b.currentTasks.get();
- }
- });
+ rnd -= candidate.perc;
+ if (rnd <= 0)
+ return candidate.dataDirectory;
+ }
+
+ // last resort
+ return candidates.get(0).dataDirectory;
+ }
+
+ // separated for unit testing
+ static void sortWriteableCandidates(List<DataDirectoryCandidate> candidates, long totalAvailable)
+ {
+ // calculate free-space-percentage
+ for (DataDirectoryCandidate candidate : candidates)
+ candidate.calcFreePerc(totalAvailable);
- return candidates.get(0);
+ // sort directories by perc
+ Collections.sort(candidates);
}
public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
@@ -336,31 +362,50 @@ public class Directories
return new SSTableLister();
}
- public static class DataDirectory implements Comparable<DataDirectory>
+ public static class DataDirectory
{
public final File location;
- public final AtomicInteger currentTasks = new AtomicInteger();
- public final AtomicLong estimatedWorkingSize = new AtomicLong();
public DataDirectory(File location)
{
this.location = location;
}
- /**
- * @return estimated available disk space for bounded directory,
- * excluding the expected size written by tasks in the queue.
- */
- public long getEstimatedAvailableSpace()
+ public long getAvailableSpace()
{
- // Load factor of 0.9 we do not want to use the entire disk that is too risky.
- return location.getUsableSpace() - estimatedWorkingSize.get();
+ return location.getUsableSpace();
}
+ }
+
+ static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>
+ {
+ final DataDirectory dataDirectory;
+ final long availableSpace;
+ double perc;
- public int compareTo(DataDirectory o)
+ public DataDirectoryCandidate(DataDirectory dataDirectory)
{
- // we want to sort by free space in descending order
- return -1 * Longs.compare(getEstimatedAvailableSpace(), o.getEstimatedAvailableSpace());
+ this.dataDirectory = dataDirectory;
+ this.availableSpace = dataDirectory.getAvailableSpace();
+ }
+
+ void calcFreePerc(long totalAvailableSpace)
+ {
+ double w = availableSpace;
+ w /= totalAvailableSpace;
+ perc = w;
+ }
+
+ public int compareTo(DataDirectoryCandidate o)
+ {
+ if (this == o)
+ return 0;
+
+ int r = Double.compare(perc, o.perc);
+ if (r != 0)
+ return -r;
+ // last resort
+ return System.identityHashCode(this) - System.identityHashCode(o);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 272b533..61628ff 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -658,9 +658,11 @@ public class CompactionManager implements CompactionManagerMBean
{
assert !cfs.isIndex();
+ Set<SSTableReader> sstableSet = Collections.singleton(sstable);
+
if (!hasIndexes && !new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
{
- cfs.getDataTracker().markCompactedSSTablesReplaced(Arrays.asList(sstable), Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
+ cfs.getDataTracker().markCompactedSSTablesReplaced(sstableSet, Collections.<SSTableReader>emptyList(), OperationType.CLEANUP);
return;
}
if (!needsCleanup(sstable, ranges))
@@ -674,13 +676,13 @@ public class CompactionManager implements CompactionManagerMBean
long totalkeysWritten = 0;
int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(),
- (int) (SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))));
+ (int) (SSTableReader.getApproximateKeyCount(sstableSet)));
if (logger.isDebugEnabled())
logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize);
logger.info("Cleaning up {}", sstable);
- File compactionFileLocation = cfs.directories.getDirectoryForNewSSTables();
+ File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableSet, OperationType.CLEANUP));
if (compactionFileLocation == null)
throw new IOException("disk full");
@@ -691,7 +693,7 @@ public class CompactionManager implements CompactionManagerMBean
Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
List<SSTableReader> finished;
- try (CompactionController controller = new CompactionController(cfs, Collections.singleton(sstable), getDefaultGcBefore(cfs)))
+ try (CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
{
writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
@@ -994,7 +996,7 @@ public class CompactionManager implements CompactionManagerMBean
Set<SSTableReader> sstableAsSet = new HashSet<>();
sstableAsSet.add(sstable);
- File destination = cfs.directories.getDirectoryForNewSSTables();
+ File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0cd71f2..2f53ab9 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -81,12 +81,13 @@ public class Scrubber implements Closeable
this.skipCorrupted = skipCorrupted;
this.isOffline = isOffline;
+ List<SSTableReader> toScrub = Collections.singletonList(sstable);
+
// Calculate the expected compacted filesize
- this.destination = cfs.directories.getDirectoryForNewSSTables();
+ this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
if (destination == null)
throw new IOException("disk full");
- List<SSTableReader> toScrub = Collections.singletonList(sstable);
// If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes.
this.controller = isOffline
? new ScrubController(cfs)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 198a88d..6d453e5 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -34,24 +34,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
while (true)
{
writeSize = getExpectedWriteSize();
- directory = getDirectories().getWriteableLocation();
+ directory = getDirectories().getWriteableLocation(writeSize);
if (directory != null || !reduceScopeForLimitedSpace())
break;
}
if (directory == null)
throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
- directory.currentTasks.incrementAndGet();
- directory.estimatedWorkingSize.addAndGet(writeSize);
- try
- {
- runWith(getDirectories().getLocationForDisk(directory));
- }
- finally
- {
- directory.estimatedWorkingSize.addAndGet(-1 * writeSize);
- directory.currentTasks.decrementAndGet();
- }
+ runWith(getDirectories().getLocationForDisk(directory));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ae8c798..79cea8e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3537,26 +3537,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return isClientMode;
}
- public synchronized void requestGC()
- {
- if (hasUnreclaimedSpace())
- {
- logger.info("requesting GC to free disk space");
- System.gc();
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- }
-
- private boolean hasUnreclaimedSpace()
- {
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- if (cfs.hasUnreclaimedSpace())
- return true;
- }
- return false;
- }
-
public String getOperationMode()
{
return operationMode.toString();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 3014549..c96a925 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.messages.FileMessageHeader;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -116,7 +115,7 @@ public class StreamReader
protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt) throws IOException
{
- Directories.DataDirectory localDir = cfs.directories.getWriteableLocation();
+ Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
if (localDir == null)
throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
desc = Descriptor.fromFilename(cfs.getTempSSTablePath(cfs.directories.getLocationForDisk(localDir)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 33da3d1..aa18954 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -17,6 +17,9 @@
*/
package org.apache.cassandra.streaming;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -113,7 +116,10 @@ public class StreamReceiveTask extends StreamTask
}
ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
- StreamLockfile lockfile = new StreamLockfile(cfs.directories.getWriteableLocationAsFile(), UUID.randomUUID());
+ File lockfiledir = cfs.directories.getWriteableLocationAsFile(task.sstables.size() * 256);
+ if (lockfiledir == null)
+ throw new IOError(new IOException("All disks full"));
+ StreamLockfile lockfile = new StreamLockfile(lockfiledir, UUID.randomUUID());
lockfile.create(task.sstables);
List<SSTableReader> readers = new ArrayList<>();
for (SSTableWriter writer : task.sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2291a60e/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 9e6b26b..34d10d2 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,7 +45,10 @@ import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class DirectoriesTest
{
@@ -238,4 +242,128 @@ public class DirectoriesTest
}
}
}
+
+ @Test
+ public void testDiskFreeSpace()
+ {
+ DataDirectory[] dataDirectories = new DataDirectory[]
+ {
+ new DataDirectory(new File("/nearlyFullDir1"))
+ {
+ public long getAvailableSpace()
+ {
+ return 11L;
+ }
+ },
+ new DataDirectory(new File("/nearlyFullDir2"))
+ {
+ public long getAvailableSpace()
+ {
+ return 10L;
+ }
+ },
+ new DataDirectory(new File("/uniformDir1"))
+ {
+ public long getAvailableSpace()
+ {
+ return 1000L;
+ }
+ },
+ new DataDirectory(new File("/uniformDir2"))
+ {
+ public long getAvailableSpace()
+ {
+ return 999L;
+ }
+ },
+ new DataDirectory(new File("/veryFullDir"))
+ {
+ public long getAvailableSpace()
+ {
+ return 4L;
+ }
+ }
+ };
+
+ // directories should be sorted
+ // 1. by their free space ratio
+ // before weighted random is applied
+ List<Directories.DataDirectoryCandidate> candidates = getWriteableDirectories(dataDirectories, 0L);
+ assertSame(dataDirectories[2], candidates.get(0).dataDirectory); // available: 1000
+ assertSame(dataDirectories[3], candidates.get(1).dataDirectory); // available: 999
+ assertSame(dataDirectories[0], candidates.get(2).dataDirectory); // available: 11
+ assertSame(dataDirectories[1], candidates.get(3).dataDirectory); // available: 10
+
+ // check for writeSize == 5
+ Map<DataDirectory, DataDirectory> testMap = new IdentityHashMap<>();
+ for (int i=0; ; i++)
+ {
+ candidates = getWriteableDirectories(dataDirectories, 5L);
+ assertEquals(4, candidates.size());
+
+ DataDirectory dir = Directories.pickWriteableDirectory(candidates);
+ testMap.put(dir, dir);
+
+ assertFalse(testMap.size() > 4);
+ if (testMap.size() == 4)
+ {
+ // at least (rule of thumb) 100 iterations to see whether there are more (wrong) directories returned
+ if (i >= 100)
+ break;
+ }
+
+ // random weighted writeable directory algorithm fails to return all possible directories after
+ // many tries
+ if (i >= 10000000)
+ fail();
+ }
+
+ // check for writeSize == 11
+ testMap.clear();
+ for (int i=0; ; i++)
+ {
+ candidates = getWriteableDirectories(dataDirectories, 11L);
+ assertEquals(3, candidates.size());
+ for (Directories.DataDirectoryCandidate candidate : candidates)
+ assertTrue(candidate.dataDirectory.getAvailableSpace() >= 11L);
+
+ DataDirectory dir = Directories.pickWriteableDirectory(candidates);
+ testMap.put(dir, dir);
+
+ assertFalse(testMap.size() > 3);
+ if (testMap.size() == 3)
+ {
+ // at least (rule of thumb) 100 iterations
+ if (i >= 100)
+ break;
+ }
+
+ // random weighted writeable directory algorithm fails to return all possible directories after
+ // many tries
+ if (i >= 10000000)
+ fail();
+ }
+ }
+
+ private List<Directories.DataDirectoryCandidate> getWriteableDirectories(DataDirectory[] dataDirectories, long writeSize)
+ {
+ // copied from Directories.getWriteableLocation(long)
+ List<Directories.DataDirectoryCandidate> candidates = new ArrayList<>();
+
+ long totalAvailable = 0L;
+
+ for (DataDirectory dataDir : dataDirectories)
+ {
+ Directories.DataDirectoryCandidate candidate = new Directories.DataDirectoryCandidate(dataDir);
+ // exclude directory if its total writeSize does not fit to data directory
+ if (candidate.availableSpace < writeSize)
+ continue;
+ candidates.add(candidate);
+ totalAvailable += candidate.availableSpace;
+ }
+
+ Directories.sortWriteableCandidates(candidates, totalAvailable);
+
+ return candidates;
+ }
}