You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/12/07 20:31:15 UTC
[accumulo] branch 2.1 updated: Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 5b4fe693dd Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083)
5b4fe693dd is described below
commit 5b4fe693dd11d572f078620aa65e8b85726acdd5
Author: Dave Marion <dl...@apache.org>
AuthorDate: Wed Dec 7 15:31:09 2022 -0500
Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083)
This commit introduces a new property, table.compaction.major.output.drop.cache,
that defaults to false and if true will call setDropBehind on the major
compaction file output stream.
---
.../org/apache/accumulo/core/conf/Property.java | 5 +++++
.../apache/accumulo/core/file/FileOperations.java | 5 +++++
.../accumulo/core/file/rfile/RFileOperations.java | 25 +++++++++++++++++++++-
.../accumulo/server/compaction/FileCompactor.java | 17 +++++++++++++--
4 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b5faf85f14..4e7760c68e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -964,6 +964,11 @@ public enum Property {
"1.3.5"),
TABLE_ARBITRARY_PROP_PREFIX("table.custom.", null, PropertyType.PREFIX,
"Prefix to be used for user defined arbitrary properties.", "1.7.0"),
+ TABLE_MAJC_OUTPUT_DROP_CACHE("table.compaction.major.output.drop.cache", "false",
+ PropertyType.BOOLEAN,
+ "Setting this property to true will call"
+ + "FSDataOutputStream.setDropBehind(true) on the major compaction output stream.",
+ "2.1.1"),
TABLE_MAJC_RATIO("table.compaction.major.ratio", "3", PropertyType.FRACTION,
"Minimum ratio of total input size to maximum input RFile size for"
+ " running a major compaction. ",
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 2a2b4aeaa5..db82b0d149 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -390,6 +390,11 @@ public abstract class FileOperations {
return this;
}
+ public WriterBuilder dropCachesBehind() {
+ this.dropCacheBehind(true);
+ return this;
+ }
+
public FileSKVWriter build() throws IOException {
return openWriter(toWriterBuilderOptions(compression, outputStream, enableAccumuloStart));
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index beeeeda6ec..609c19550e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.rfile;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import org.apache.accumulo.core.client.sample.Sampler;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -36,14 +37,20 @@ import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
import org.apache.accumulo.core.sample.impl.SamplerFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
public class RFileOperations extends FileOperations {
+ private static final Logger LOG = LoggerFactory.getLogger(RFileOperations.class);
+
private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
private static RFile.Reader getReader(FileOptions options) throws IOException {
@@ -129,7 +136,23 @@ public class RFileOperations extends FileOperations {
String file = options.getFilename();
FileSystem fs = options.getFileSystem();
- outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
+ if (options.dropCacheBehind) {
+ EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+ outputStream = fs.create(new Path(file), FsPermission.getDefault(), set, bufferSize,
+ (short) rep, block, null);
+ try {
+ // Tell the DataNode that the file does not need to be cached in the OS page cache
+ outputStream.setDropBehind(Boolean.TRUE);
+ LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", options.filename);
+ } catch (UnsupportedOperationException e) {
+ LOG.debug("setDropBehind not enabled for file: {}", options.filename);
+ } catch (IOException e) {
+ LOG.debug("IOException setting drop behind for file: {}, msg: {}", options.filename,
+ e.getMessage());
+ }
+ } else {
+ outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
+ }
}
BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index e083c5aa14..a7e879dc8b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileOperations.WriterBuilder;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
@@ -50,6 +52,8 @@ import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterato
import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator;
import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -212,9 +216,18 @@ public class FileCompactor implements Callable<CompactionStats> {
try {
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
- mfw = fileFactory.newWriterBuilder()
+
+ boolean dropCacheBehindMajcOutput = !RootTable.ID.equals(this.extent.tableId())
+ && !MetadataTable.ID.equals(this.extent.tableId())
+ && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE);
+
+ WriterBuilder outBuilder = fileFactory.newWriterBuilder()
.forFile(outputFile.getMetaInsert(), ns, ns.getConf(), cryptoService)
- .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()).build();
+ .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter());
+ if (dropCacheBehindMajcOutput) {
+ outBuilder.dropCachesBehind();
+ }
+ mfw = outBuilder.build();
Map<String,Set<ByteSequence>> lGroups = getLocalityGroups(acuTableConf);