You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mi...@apache.org on 2022/10/27 19:57:36 UTC

[lucene] branch branch_9x updated: GITHUB#11795: Add FilterDirectory to track write amplification factor (#11796)

This is an automated email from the ASF dual-hosted git repository.

mikemccand pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 373d2e84c13 GITHUB#11795: Add FilterDirectory to track write amplification factor (#11796)
373d2e84c13 is described below

commit 373d2e84c13ee67e8e1247338e69b53946b7f726
Author: Marc D'Mello <ma...@gmail.com>
AuthorDate: Thu Oct 27 12:07:56 2022 -0700

    GITHUB#11795: Add FilterDirectory to track write amplification factor (#11796)
    
    * LUCENE-11795: Add FilterDirectory to track write amplification factor
    
    * addressed feedback
    
    * added optional temp output tracking and real time tracking
    
    * addressed more feedback
    
    * more improvements + added CHANGED.txt entry
    
    * format edit to CHANGES.txt
    
    * remove waf factor calculation
    
    Co-authored-by: Marc D'Mello <dm...@amazon.com>
---
 lucene/CHANGES.txt                                 |   7 +-
 .../lucene/misc/store/ByteTrackingIndexOutput.java |  91 +++++++++++++++++
 .../store/ByteWritesTrackingDirectoryWrapper.java  |  80 +++++++++++++++
 .../TestByteWritesTrackingDirectoryWrapper.java    | 111 +++++++++++++++++++++
 4 files changed, 288 insertions(+), 1 deletion(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 8d82137e403..6f7f6811e80 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -17,9 +17,14 @@ API Changes
 
 * GITHUB#11761: TieredMergePolicy now allowed a maximum allowable deletes percentage of down to 5%, and the default
   maximum allowable deletes percentage is changed from 33% to 20%. (Marc D'Mello)
-
+  
 * GITHUB#11822: Configure replicator PrimaryNode replia shutdown timeout. (Steven Schlansker)
 
+New Features
+---------------------
+* GITHUB#11795: Add ByteWritesTrackingDirectoryWrapper to expose metrics for bytes merged, flushed, and overall
+  write amplification factor. (Marc D'Mello)
+
 Improvements
 ---------------------
 * GITHUB#11785: Improve Tessellator performance by delaying calls to the method
diff --git a/lucene/misc/src/java/org/apache/lucene/misc/store/ByteTrackingIndexOutput.java b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteTrackingIndexOutput.java
new file mode 100644
index 00000000000..4adb1938b4a
--- /dev/null
+++ b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteTrackingIndexOutput.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.misc.store;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.store.IndexOutput;
+
+/** An {@link IndexOutput} that wraps another instance and tracks the number of bytes written */
+public class ByteTrackingIndexOutput extends IndexOutput {
+
+  private final IndexOutput output;
+  private final AtomicLong byteTracker;
+  private boolean closed = false;
+
+  protected ByteTrackingIndexOutput(IndexOutput output, AtomicLong byteTracker) {
+    super(
+        "Byte tracking wrapper for: " + output.getName(),
+        "ByteTrackingIndexOutput{" + output.getName() + "}");
+    this.output = output;
+    this.byteTracker = byteTracker;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    output.writeByte(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    output.writeBytes(b, offset, length);
+  }
+
+  @Override
+  public void writeShort(short i) throws IOException {
+    output.writeShort(i);
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    output.writeInt(i);
+  }
+
+  @Override
+  public void writeLong(long i) throws IOException {
+    output.writeLong(i);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      output.close();
+      return;
+    }
+    byteTracker.addAndGet(output.getFilePointer());
+    closed = true;
+    output.close();
+  }
+
+  @Override
+  public long getFilePointer() {
+    return output.getFilePointer();
+  }
+
+  @Override
+  public long getChecksum() throws IOException {
+    return output.getChecksum();
+  }
+
+  public String getWrappedName() {
+    return output.getName();
+  }
+
+  public String getWrappedToString() {
+    return output.toString();
+  }
+}
diff --git a/lucene/misc/src/java/org/apache/lucene/misc/store/ByteWritesTrackingDirectoryWrapper.java b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteWritesTrackingDirectoryWrapper.java
new file mode 100644
index 00000000000..6619bf8ddbe
--- /dev/null
+++ b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteWritesTrackingDirectoryWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.misc.store;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+
+/** {@link FilterDirectory} that tracks write amplification factor */
+public final class ByteWritesTrackingDirectoryWrapper extends FilterDirectory {
+
+  private final AtomicLong flushedBytes = new AtomicLong();
+  private final AtomicLong mergedBytes = new AtomicLong();
+  public final boolean trackTempOutput;
+
+  public ByteWritesTrackingDirectoryWrapper(Directory in) {
+    this(in, false);
+  }
+  /**
+   * Constructor with option to track tempOutput
+   *
+   * @param in input Directory
+   * @param trackTempOutput if true, will also track temporary outputs created by this directory
+   */
+  public ByteWritesTrackingDirectoryWrapper(Directory in, boolean trackTempOutput) {
+    super(in);
+    this.trackTempOutput = trackTempOutput;
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext ioContext) throws IOException {
+    IndexOutput output = in.createOutput(name, ioContext);
+    return createByteTrackingOutput(output, ioContext.context);
+  }
+
+  @Override
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext)
+      throws IOException {
+    IndexOutput output = in.createTempOutput(prefix, suffix, ioContext);
+    return trackTempOutput ? createByteTrackingOutput(output, ioContext.context) : output;
+  }
+
+  private IndexOutput createByteTrackingOutput(IndexOutput output, IOContext.Context context) {
+    switch (context) {
+      case FLUSH:
+        return new ByteTrackingIndexOutput(output, flushedBytes);
+      case MERGE:
+        return new ByteTrackingIndexOutput(output, mergedBytes);
+      case DEFAULT:
+      case READ:
+      default:
+        return output;
+    }
+  }
+
+  public long getFlushedBytes() {
+    return flushedBytes.get();
+  }
+
+  public long getMergedBytes() {
+    return mergedBytes.get();
+  }
+}
diff --git a/lucene/misc/src/test/org/apache/lucene/misc/store/TestByteWritesTrackingDirectoryWrapper.java b/lucene/misc/src/test/org/apache/lucene/misc/store/TestByteWritesTrackingDirectoryWrapper.java
new file mode 100644
index 00000000000..74d9e6e1b57
--- /dev/null
+++ b/lucene/misc/src/test/org/apache/lucene/misc/store/TestByteWritesTrackingDirectoryWrapper.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.misc.store;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import org.apache.lucene.store.ByteBuffersDirectory;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FlushInfo;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.MergeInfo;
+import org.apache.lucene.tests.store.BaseDirectoryTestCase;
+
+public class TestByteWritesTrackingDirectoryWrapper extends BaseDirectoryTestCase {
+
+  public void testEmptyDir() throws Exception {
+    ByteWritesTrackingDirectoryWrapper dir =
+        new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory());
+    assertEquals(0.0, dir.getFlushedBytes(), 0.0);
+    assertEquals(0.0, dir.getMergedBytes(), 0.0);
+  }
+
+  public void testRandomOutput() throws Exception {
+    ByteWritesTrackingDirectoryWrapper dir =
+        new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory());
+
+    int expectedFlushBytes = random().nextInt(100);
+    int expectedMergeBytes = random().nextInt(100);
+
+    IndexOutput output =
+        dir.createOutput("write", new IOContext(new FlushInfo(10, expectedFlushBytes)));
+    byte[] flushBytesArr = new byte[expectedFlushBytes];
+    for (int i = 0; i < expectedFlushBytes; i++) {
+      flushBytesArr[i] = (byte) random().nextInt(127);
+    }
+    output.writeBytes(flushBytesArr, flushBytesArr.length);
+    assertEquals(0.0, dir.getFlushedBytes(), 0.0);
+    assertEquals(0.0, dir.getMergedBytes(), 0.0);
+    output.close();
+
+    // now merge bytes
+    output =
+        dir.createOutput("merge", new IOContext(new MergeInfo(10, expectedMergeBytes, false, 2)));
+    byte[] mergeBytesArr = new byte[expectedMergeBytes];
+    for (int i = 0; i < expectedMergeBytes; i++) {
+      mergeBytesArr[i] = (byte) random().nextInt(127);
+    }
+    output.writeBytes(mergeBytesArr, mergeBytesArr.length);
+    assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0);
+    assertEquals(0.0, dir.getMergedBytes(), 0.0);
+    output.close();
+
+    assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0);
+    assertEquals(expectedMergeBytes, dir.getMergedBytes(), 0.0);
+  }
+
+  public void testRandomTempOutput() throws Exception {
+    ByteWritesTrackingDirectoryWrapper dir =
+        new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory(), true);
+
+    int expectedFlushBytes = random().nextInt(100);
+    int expectedMergeBytes = random().nextInt(100);
+
+    IndexOutput output =
+        dir.createTempOutput("temp", "write", new IOContext(new FlushInfo(10, expectedFlushBytes)));
+    byte[] flushBytesArr = new byte[expectedFlushBytes];
+    for (int i = 0; i < expectedFlushBytes; i++) {
+      flushBytesArr[i] = (byte) random().nextInt(127);
+    }
+    output.writeBytes(flushBytesArr, flushBytesArr.length);
+    assertEquals(0.0, dir.getFlushedBytes(), 0.0);
+    assertEquals(0.0, dir.getMergedBytes(), 0.0);
+    output.close();
+
+    // now merge bytes
+    output =
+        dir.createTempOutput(
+            "temp", "merge", new IOContext(new MergeInfo(10, expectedMergeBytes, false, 2)));
+    byte[] mergeBytesArr = new byte[expectedMergeBytes];
+    for (int i = 0; i < expectedMergeBytes; i++) {
+      mergeBytesArr[i] = (byte) random().nextInt(127);
+    }
+    output.writeBytes(mergeBytesArr, mergeBytesArr.length);
+    assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0);
+    assertEquals(0.0, dir.getMergedBytes(), 0.0);
+    output.close();
+
+    assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0);
+    assertEquals(expectedMergeBytes, dir.getMergedBytes(), 0.0);
+  }
+
+  @Override
+  protected Directory getDirectory(Path path) throws IOException {
+    return new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory());
+  }
+}