You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/12/07 20:31:15 UTC

[accumulo] branch 2.1 updated: Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 5b4fe693dd Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083)
5b4fe693dd is described below

commit 5b4fe693dd11d572f078620aa65e8b85726acdd5
Author: Dave Marion <dl...@apache.org>
AuthorDate: Wed Dec 7 15:31:09 2022 -0500

    Allow user to enable FSDataOutputStream.setDropBehind on majc output (#3083)
    
    This commit introduces a new property, table.compaction.major.output.drop.cache,
    that defaults to false and if true will call setDropBehind on the major
    compaction file output stream.
---
 .../org/apache/accumulo/core/conf/Property.java    |  5 +++++
 .../apache/accumulo/core/file/FileOperations.java  |  5 +++++
 .../accumulo/core/file/rfile/RFileOperations.java  | 25 +++++++++++++++++++++-
 .../accumulo/server/compaction/FileCompactor.java  | 17 +++++++++++++--
 4 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index b5faf85f14..4e7760c68e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -964,6 +964,11 @@ public enum Property {
       "1.3.5"),
   TABLE_ARBITRARY_PROP_PREFIX("table.custom.", null, PropertyType.PREFIX,
       "Prefix to be used for user defined arbitrary properties.", "1.7.0"),
+  TABLE_MAJC_OUTPUT_DROP_CACHE("table.compaction.major.output.drop.cache", "false",
+      PropertyType.BOOLEAN,
+      "Setting this property to true will call"
+          + "FSDataOutputStream.setDropBehind(true) on the major compaction output stream.",
+      "2.1.1"),
   TABLE_MAJC_RATIO("table.compaction.major.ratio", "3", PropertyType.FRACTION,
       "Minimum ratio of total input size to maximum input RFile size for"
           + " running a major compaction. ",
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 2a2b4aeaa5..db82b0d149 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -390,6 +390,11 @@ public abstract class FileOperations {
       return this;
     }
 
+    public WriterBuilder dropCachesBehind() {
+      this.dropCacheBehind(true);
+      return this;
+    }
+
     public FileSKVWriter build() throws IOException {
       return openWriter(toWriterBuilderOptions(compression, outputStream, enableAccumuloStart));
     }
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index beeeeda6ec..609c19550e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.rfile;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 
 import org.apache.accumulo.core.client.sample.Sampler;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -36,14 +37,20 @@ import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
 public class RFileOperations extends FileOperations {
 
+  private static final Logger LOG = LoggerFactory.getLogger(RFileOperations.class);
+
   private static final Collection<ByteSequence> EMPTY_CF_SET = Collections.emptySet();
 
   private static RFile.Reader getReader(FileOptions options) throws IOException {
@@ -129,7 +136,23 @@ public class RFileOperations extends FileOperations {
       String file = options.getFilename();
       FileSystem fs = options.getFileSystem();
 
-      outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
+      if (options.dropCacheBehind) {
+        EnumSet<CreateFlag> set = EnumSet.of(CreateFlag.SYNC_BLOCK, CreateFlag.CREATE);
+        outputStream = fs.create(new Path(file), FsPermission.getDefault(), set, bufferSize,
+            (short) rep, block, null);
+        try {
+          // Tell the DataNode that the file does not need to be cached in the OS page cache
+          outputStream.setDropBehind(Boolean.TRUE);
+          LOG.trace("Called setDropBehind(TRUE) for stream writing file {}", options.filename);
+        } catch (UnsupportedOperationException e) {
+          LOG.debug("setDropBehind not enabled for file: {}", options.filename);
+        } catch (IOException e) {
+          LOG.debug("IOException setting drop behind for file: {}, msg: {}", options.filename,
+              e.getMessage());
+        }
+      } else {
+        outputStream = fs.create(new Path(file), false, bufferSize, (short) rep, block);
+      }
     }
 
     BCFile.Writer _cbw = new BCFile.Writer(outputStream, options.getRateLimiter(), compression,
diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index e083c5aa14..a7e879dc8b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileOperations.WriterBuilder;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
@@ -50,6 +52,8 @@ import org.apache.accumulo.core.iteratorsImpl.system.ColumnFamilySkippingIterato
 import org.apache.accumulo.core.iteratorsImpl.system.DeletingIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator;
 import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -212,9 +216,18 @@ public class FileCompactor implements Callable<CompactionStats> {
     try {
       FileOperations fileFactory = FileOperations.getInstance();
       FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
-      mfw = fileFactory.newWriterBuilder()
+
+      boolean dropCacheBehindMajcOutput = !RootTable.ID.equals(this.extent.tableId())
+          && !MetadataTable.ID.equals(this.extent.tableId())
+          && acuTableConf.getBoolean(Property.TABLE_MAJC_OUTPUT_DROP_CACHE);
+
+      WriterBuilder outBuilder = fileFactory.newWriterBuilder()
           .forFile(outputFile.getMetaInsert(), ns, ns.getConf(), cryptoService)
-          .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter()).build();
+          .withTableConfiguration(acuTableConf).withRateLimiter(env.getWriteLimiter());
+      if (dropCacheBehindMajcOutput) {
+        outBuilder.dropCachesBehind();
+      }
+      mfw = outBuilder.build();
 
       Map<String,Set<ByteSequence>> lGroups = getLocalityGroups(acuTableConf);