You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rm...@apache.org on 2021/02/24 07:48:10 UTC

[lucene-solr] branch master updated: LUCENE-9794: Optimize skipBytes implementation in remaining DataInput subclasses

This is an automated email from the ASF dual-hosted git repository.

rmuir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 84a35df  LUCENE-9794: Optimize skipBytes implementation in remaining DataInput subclasses
84a35df is described below

commit 84a35dfaea27581174c1104e239187112a1b5d43
Author: Robert Muir <rm...@apache.org>
AuthorDate: Wed Feb 24 02:46:24 2021 -0500

    LUCENE-9794: Optimize skipBytes implementation in remaining DataInput subclasses
    
    Fix various DataInputs to no longer use skipBytesSlowly, add new tests.
---
 lucene/CHANGES.txt                                 |   2 +
 .../compressing/CompressingStoredFieldsReader.java |  10 +-
 .../org/apache/lucene/index/ByteSliceReader.java   |  16 ++-
 .../apache/lucene/store/ByteBuffersDataInput.java  |   6 +-
 .../apache/lucene/store/ChecksumIndexInput.java    |  34 ++++++-
 .../java/org/apache/lucene/store/DataInput.java    |   4 +-
 .../apache/lucene/store/InputStreamDataInput.java  |   9 +-
 .../java/org/apache/lucene/util/PagedBytes.java    |   8 +-
 .../apache/lucene/index/TestByteSliceReader.java   |  83 ++++++++++++++++
 .../lucene/store/TestByteBuffersDataInput.java     |  16 ++-
 .../lucene/store/TestInputStreamDataInput.java     | 110 +++++++++++++++++++++
 .../org/apache/lucene/util/TestPagedBytes.java     |  12 +++
 12 files changed, 295 insertions(+), 15 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 9bfe011..9b6bea99 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -194,6 +194,8 @@ Improvements
 * LUCENE-9663: Adding compression to terms dict from SortedSet/Sorted DocValues.
   (Jaison Bi via Bruno Roustant)
 
+* LUCENE-9794: Speed up implementations of DataInput.skipBytes(). (Greg Miller)
+
 Bug fixes
 
 
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
index f169e52..9b7fac4 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/compressing/CompressingStoredFieldsReader.java
@@ -654,7 +654,15 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
 
               @Override
               public void skipBytes(long numBytes) throws IOException {
-                skipBytesSlowly(numBytes);
+                if (numBytes < 0) {
+                  throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
+                }
+                while (numBytes > bytes.length) {
+                  numBytes -= bytes.length;
+                  fillBuffer();
+                }
+                bytes.offset += numBytes;
+                bytes.length -= numBytes;
               }
             };
       } else {
diff --git a/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java b/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java
index 951d2c6..b0b65de 100644
--- a/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java
+++ b/lucene/core/src/java/org/apache/lucene/index/ByteSliceReader.java
@@ -140,7 +140,19 @@ final class ByteSliceReader extends DataInput {
   }
 
   @Override
-  public void skipBytes(long numBytes) throws IOException {
-    skipBytesSlowly(numBytes);
+  public void skipBytes(long numBytes) {
+    if (numBytes < 0) {
+      throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
+    }
+    while (numBytes > 0) {
+      final int numLeft = limit - upto;
+      if (numLeft < numBytes) {
+        numBytes -= numLeft;
+        nextSlice();
+      } else {
+        upto += numBytes;
+        break;
+      }
+    }
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
index 57cfd5b..b6572f6 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
@@ -261,7 +261,11 @@ public final class ByteBuffersDataInput extends DataInput
 
   @Override
   public void skipBytes(long numBytes) throws IOException {
-    skipBytesSlowly(numBytes);
+    if (numBytes < 0) {
+      throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
+    }
+    long skipTo = position() + numBytes;
+    seek(skipTo);
   }
 
   public ByteBuffersDataInput slice(long offset, long length) {
diff --git a/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java
index 04a21f0..6f4a1ae 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ChecksumIndexInput.java
@@ -24,6 +24,17 @@ import java.io.IOException;
  */
 public abstract class ChecksumIndexInput extends IndexInput {
 
+  private static final int SKIP_BUFFER_SIZE = 1024;
+
+  /* This buffer is used when skipping bytes in skipBytes(). Skipping bytes
+   * still requires reading in the bytes we skip in order to update the checksum.
+   * The reason we need to use an instance member instead of sharing a single
+   * static instance across threads is that multiple instances invoking skipBytes()
+   * concurrently on different threads can clobber the contents of a shared buffer,
+   * corrupting the checksum. See LUCENE-5583 for additional context.
+   */
+  private byte[] skipBuffer;
+
   /**
    * resourceDescription should be a non-null, opaque string describing this resource; it's returned
    * from {@link #toString}.
@@ -50,10 +61,23 @@ public abstract class ChecksumIndexInput extends IndexInput {
       throw new IllegalStateException(
           getClass() + " cannot seek backwards (pos=" + pos + " getFilePointer()=" + curFP + ")");
     }
-    // we must skip slowly to ensure skipped bytes are still read and used
-    // to update checksums
-    // TODO: this "slow skip" logic should be moved into this class once
-    //       no longer needed as default logic in DataInput
-    skipBytesSlowly(skip);
+    skipByReading(skip);
+  }
+
+  /**
+   * Skip over <code>numBytes</code> bytes. The contract on this method is that it should have the
+   * same behavior as reading the same number of bytes into a buffer and discarding its content.
+   * Negative values of <code>numBytes</code> are not supported.
+   */
+  private void skipByReading(long numBytes) throws IOException {
+    if (skipBuffer == null) {
+      skipBuffer = new byte[SKIP_BUFFER_SIZE];
+    }
+    assert skipBuffer.length == SKIP_BUFFER_SIZE;
+    for (long skipped = 0; skipped < numBytes; ) {
+      final int step = (int) Math.min(SKIP_BUFFER_SIZE, numBytes - skipped);
+      readBytes(skipBuffer, 0, step, false);
+      skipped += step;
+    }
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/store/DataInput.java b/lucene/core/src/java/org/apache/lucene/store/DataInput.java
index 1177619..d7d1ead 100644
--- a/lucene/core/src/java/org/apache/lucene/store/DataInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/DataInput.java
@@ -41,8 +41,8 @@ public abstract class DataInput implements Cloneable {
 
   private static final int SKIP_BUFFER_SIZE = 1024;
 
-  /* This buffer is used to skip over bytes with the default implementation of
-   * skipBytes. The reason why we need to use an instance member instead of
+  /* This buffer is used to skip over bytes with the slow implementation of
+   * skipBytesSlowly. The reason why we need to use an instance member instead of
    * sharing a single instance across threads is that some delegating
    * implementations of DataInput might want to reuse the provided buffer in
    * order to eg. update the checksum. If we shared the same buffer across
diff --git a/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java b/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java
index f86185a..8039597 100644
--- a/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/InputStreamDataInput.java
@@ -53,6 +53,13 @@ public class InputStreamDataInput extends DataInput implements Closeable {
 
   @Override
   public void skipBytes(long numBytes) throws IOException {
-    skipBytesSlowly(numBytes);
+    if (numBytes < 0) {
+      throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
+    }
+    long skipped = is.skip(numBytes);
+    assert skipped <= numBytes;
+    if (skipped < numBytes) {
+      throw new EOFException();
+    }
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java b/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
index 6916ef1..dd495a9 100644
--- a/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
+++ b/lucene/core/src/java/org/apache/lucene/util/PagedBytes.java
@@ -345,8 +345,12 @@ public final class PagedBytes implements Accountable {
     }
 
     @Override
-    public void skipBytes(long numBytes) throws IOException {
-      skipBytesSlowly(numBytes);
+    public void skipBytes(long numBytes) {
+      if (numBytes < 0) {
+        throw new IllegalArgumentException("numBytes must be >= 0, got " + numBytes);
+      }
+      final long skipTo = getPosition() + numBytes;
+      setPosition(skipTo);
     }
 
     private void nextBlock() {
diff --git a/lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java b/lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java
new file mode 100644
index 0000000..b0bc2f9
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/index/TestByteSliceReader.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.index;
+
+import java.util.Random;
+import org.apache.lucene.util.ByteBlockPool;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class TestByteSliceReader extends LuceneTestCase {
+  private static byte[] RANDOM_DATA;
+  private static ByteBlockPool BLOCK_POOL;
+  private static int BLOCK_POOL_END;
+
+  @BeforeClass
+  public static void beforeClass() {
+    int len = atLeast(100);
+    RANDOM_DATA = new byte[len];
+    random().nextBytes(RANDOM_DATA);
+
+    BLOCK_POOL = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
+    BLOCK_POOL.nextBuffer();
+    byte[] buffer = BLOCK_POOL.buffer;
+    int upto = BLOCK_POOL.newSlice(ByteBlockPool.FIRST_LEVEL_SIZE);
+    for (byte randomByte : RANDOM_DATA) {
+      if ((buffer[upto] & 16) != 0) {
+        upto = BLOCK_POOL.allocSlice(buffer, upto);
+        buffer = BLOCK_POOL.buffer;
+      }
+      buffer[upto++] = randomByte;
+    }
+    BLOCK_POOL_END = upto;
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    RANDOM_DATA = null;
+    BLOCK_POOL = null;
+  }
+
+  public void testReadByte() {
+    ByteSliceReader sliceReader = new ByteSliceReader();
+    sliceReader.init(BLOCK_POOL, 0, BLOCK_POOL_END);
+    for (byte expected : RANDOM_DATA) {
+      assertEquals(expected, sliceReader.readByte());
+    }
+  }
+
+  public void testSkipBytes() {
+    Random random = random();
+    ByteSliceReader sliceReader = new ByteSliceReader();
+
+    int maxSkipTo = RANDOM_DATA.length - 1;
+    int iterations = atLeast(random, 10);
+    for (int i = 0; i < iterations; i++) {
+      sliceReader.init(BLOCK_POOL, 0, BLOCK_POOL_END);
+      // skip random chunks of bytes until exhausted
+      for (int curr = 0; curr < maxSkipTo; ) {
+        int skipTo = TestUtil.nextInt(random, curr, maxSkipTo);
+        int step = skipTo - curr;
+        sliceReader.skipBytes(step);
+        assertEquals(RANDOM_DATA[skipTo], sliceReader.readByte());
+        curr = skipTo + 1; // +1 for read byte
+      }
+    }
+  }
+}
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
index 8d8bcd4..fdbcd42 100644
--- a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
@@ -139,7 +139,7 @@ public final class TestByteBuffersDataInput extends RandomizedTest {
   }
 
   @Test
-  public void testSeek() throws Exception {
+  public void testSeekAndSkip() throws Exception {
     for (int reps = randomIntBetween(1, 200); --reps > 0; ) {
       ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
 
@@ -169,12 +169,26 @@ public final class TestByteBuffersDataInput extends RandomizedTest {
       byte[] array = dst.toArrayCopy();
       array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length);
 
+      // test seeking
       for (int i = 0; i < 1000; i++) {
         int offs = randomIntBetween(0, array.length - 1);
         in.seek(offs);
         assertEquals(offs, in.position());
         assertEquals(array[offs], in.readByte());
       }
+
+      // test skipping
+      int maxSkipTo = array.length - 1;
+      in.seek(0);
+      // skip chunks of bytes until exhausted
+      for (int curr = 0; curr < maxSkipTo; ) {
+        int skipTo = randomIntBetween(curr, maxSkipTo);
+        int step = skipTo - curr;
+        in.skipBytes(step);
+        assertEquals(array[skipTo], in.readByte());
+        curr = skipTo + 1; // +1 for read byte
+      }
+
       in.seek(in.size());
       assertEquals(in.size(), in.position());
       LuceneTestCase.expectThrows(
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java
new file mode 100644
index 0000000..26f8fd4
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestInputStreamDataInput.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+public class TestInputStreamDataInput extends LuceneTestCase {
+  private static byte[] RANDOM_DATA;
+  private InputStreamDataInput in;
+
+  @BeforeClass
+  public static void beforeClass() {
+    RANDOM_DATA = new byte[atLeast(100)];
+    random().nextBytes(RANDOM_DATA);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    RANDOM_DATA = null;
+  }
+
+  @Before
+  public void before() {
+    in = new NoReadInputStreamDataInput(new ByteArrayInputStream(RANDOM_DATA));
+  }
+
+  @After
+  public void after() throws IOException {
+    in.close();
+    in = null;
+  }
+
+  public void testSkipBytes() throws IOException {
+    Random random = random();
+    // not using the wrapped (NoReadInputStreamDataInput) here since we want to actually read and
+    // verify
+    InputStreamDataInput in = new InputStreamDataInput(new ByteArrayInputStream(RANDOM_DATA));
+    int maxSkipTo = RANDOM_DATA.length - 1;
+    // skip chunks of bytes until exhausted
+    for (int curr = 0; curr < maxSkipTo; ) {
+      int skipTo = TestUtil.nextInt(random, curr, maxSkipTo);
+      int step = skipTo - curr;
+      in.skipBytes(step);
+      assertEquals(RANDOM_DATA[skipTo], in.readByte());
+      curr = skipTo + 1; // +1 for read byte
+    }
+    in.close();
+  }
+
+  public void testNoReadWhenSkipping() throws IOException {
+    Random random = random();
+    int maxSkipTo = RANDOM_DATA.length - 1;
+    // skip chunks of bytes until exhausted
+    for (int curr = 0; curr < maxSkipTo; ) {
+      int step = TestUtil.nextInt(random, 0, maxSkipTo - curr);
+      in.skipBytes(step);
+      curr += step;
+    }
+  }
+
+  public void testFullSkip() throws IOException {
+    in.skipBytes(RANDOM_DATA.length);
+  }
+
+  public void testSkipOffEnd() {
+    expectThrows(EOFException.class, () -> in.skipBytes(RANDOM_DATA.length + 1));
+  }
+
+  /** Throws if trying to read bytes to ensure skipBytes doesn't invoke read */
+  private static final class NoReadInputStreamDataInput extends InputStreamDataInput {
+
+    public NoReadInputStreamDataInput(InputStream is) {
+      super(is);
+    }
+
+    @Override
+    public void readBytes(byte[] b, int offset, int len) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public byte readByte() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}
diff --git a/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java b/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java
index 837259e..28050b4 100644
--- a/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java
+++ b/lucene/core/src/test/org/apache/lucene/util/TestPagedBytes.java
@@ -155,6 +155,18 @@ public class TestPagedBytes extends LuceneTestCase {
           assertEquals(answer[pos + byteUpto], slice.bytes[slice.offset + byteUpto]);
         }
       }
+
+      // test skipping
+      final DataInput in2 = p.getDataInput();
+      int maxSkipTo = numBytes - 1;
+      // skip chunks of bytes until exhausted
+      for (int curr = 0; curr < maxSkipTo; ) {
+        int skipTo = TestUtil.nextInt(random, curr, maxSkipTo);
+        int step = skipTo - curr;
+        in2.skipBytes(step);
+        assertEquals(answer[skipTo], in2.readByte());
+        curr = skipTo + 1; // +1 for read byte
+      }
     }
   }