You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/18 14:07:33 UTC

[2/3] cassandra git commit: Implement a more efficient skipBytes (CASSANDRA-10322)

Implement a more efficient skipBytes (CASSANDRA-10322)

Avoids garbage production during a call to skipBytes on
any DataInputPlus, and strengthens the skipBytes contract
to always skip the requested number of bytes unless EOF
is encountered. Also moves FileUtils.skipBytesFully to
DataInputPlus.

patch by benedict; reviewed by jbellis for CASSANDRA-10322


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff27eb30
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff27eb30
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff27eb30

Branch: refs/heads/trunk
Commit: ff27eb304c84a5b542b8a5da1712f289fa45ba81
Parents: 3355caf
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Sep 14 19:47:06 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 18 13:06:12 2015 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/db/DeletionTime.java   |  2 +-
 .../org/apache/cassandra/db/RowIndexEntry.java  |  4 ++--
 .../cassandra/db/marshal/AbstractType.java      |  2 +-
 .../apache/cassandra/io/util/DataInputPlus.java | 21 ++++++++++++++-----
 .../org/apache/cassandra/io/util/FileUtils.java | 12 -----------
 .../io/util/RebufferingInputStream.java         | 22 +++++++++++++-------
 .../org/apache/cassandra/net/MessageIn.java     |  2 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |  6 +++---
 8 files changed, 38 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 343a6c2..919c603 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -173,7 +173,7 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
 
         public void skip(DataInputPlus in) throws IOException
         {
-            FileUtils.skipBytesFully(in, 4 + 8);
+            in.skipBytesFully(4 + 8);
         }
 
         public long serializedSize(DeletionTime delTime)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index 198e890..4e2f063 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -217,7 +217,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
                 for (int i = 0; i < entries; i++)
                     columnsIndex.add(idxSerializer.deserialize(in));
 
-                FileUtils.skipBytesFully(in, entries * TypeSizes.sizeof(0));
+                in.skipBytesFully(entries * TypeSizes.sizeof(0));
 
                 return new IndexedEntry(position, deletionTime, headerLength, columnsIndex);
             }
@@ -247,7 +247,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
             if (size <= 0)
                 return;
 
-            FileUtils.skipBytesFully(in, size);
+            in.skipBytesFully(size);
         }
 
         public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 5c46823..a0d915f 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -395,7 +395,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
     {
         int length = valueLengthIfFixed();
         if (length >= 0)
-            FileUtils.skipBytesFully(in, length);
+            in.skipBytesFully(length);
         else
             ByteBufferUtil.skipWithVIntLength(in);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/io/util/DataInputPlus.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DataInputPlus.java b/src/java/org/apache/cassandra/io/util/DataInputPlus.java
index a029427..7c29ee1 100644
--- a/src/java/org/apache/cassandra/io/util/DataInputPlus.java
+++ b/src/java/org/apache/cassandra/io/util/DataInputPlus.java
@@ -17,10 +17,7 @@
  */
 package org.apache.cassandra.io.util;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
 
 import org.apache.cassandra.utils.vint.VIntCoding;
 
@@ -29,7 +26,6 @@ import org.apache.cassandra.utils.vint.VIntCoding;
  */
 public interface DataInputPlus extends DataInput
 {
-
     default long readVInt() throws IOException
     {
         return VIntCoding.readVInt(this);
@@ -48,6 +44,21 @@ public interface DataInputPlus extends DataInput
     }
 
     /**
+     * Always skips the requested number of bytes, unless EOF is reached
+     *
+     * @param n number of bytes to skip
+     * @return number of bytes skipped
+     */
+    public int skipBytes(int n) throws IOException;
+
+    public default void skipBytesFully(int n) throws IOException
+    {
+        int skipped = skipBytes(n);
+        if (skipped != n)
+            throw new EOFException("EOF after " + skipped + " bytes out of " + n);
+    }
+
+    /**
      * Wrapper around an InputStream that provides no buffering but can decode varints
      */
     public class DataInputStreamPlus extends DataInputStream implements DataInputPlus

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 8b7b1e1..78eeb8f 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -458,18 +458,6 @@ public class FileUtils
         dir.deleteOnExit();
     }
 
-    public static void skipBytesFully(DataInput in, int bytes) throws IOException
-    {
-        int n = 0;
-        while (n < bytes)
-        {
-            int skipped = in.skipBytes(bytes - n);
-            if (skipped == 0)
-                throw new EOFException("EOF after " + n + " bytes out of " + bytes);
-            n += skipped;
-        }
-    }
-
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
         if (!StorageService.instance.isSetupCompleted())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 958a815..3068746 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -112,16 +112,22 @@ public abstract class RebufferingInputStream extends InputStream implements Data
     @Override
     public int skipBytes(int n) throws IOException
     {
-        int skipped = 0;
-
-        while (skipped < n)
+        if (n < 0)
+            return 0;
+        int requested = n;
+        int position = buffer.position(), limit = buffer.limit(), remaining;
+        while ((remaining = limit - position) < n)
         {
-            int skippedThisTime = (int)skip(n - skipped);
-            if (skippedThisTime <= 0) break;
-            skipped += skippedThisTime;
+            n -= remaining;
+            buffer.position(limit);
+            reBuffer();
+            position = buffer.position();
+            limit = buffer.limit();
+            if (position == limit)
+                return requested - n;
         }
-
-        return skipped;
+        buffer.position(position + n);
+        return requested;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 82f4000..64b8e81 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -88,7 +88,7 @@ public class MessageIn<T>
             if (callback == null)
             {
                 // reply for expired callback.  we'll have to skip it.
-                FileUtils.skipBytesFully(in, payloadSize);
+                in.skipBytesFully(payloadSize);
                 return null;
             }
             serializer = (IVersionedSerializer<T2>) callback.serializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff27eb30/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 70d4bd5..27f46b6 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -357,7 +357,7 @@ public class ByteBufferUtil
         if (length < 0)
             throw new IOException("Corrupt (negative) value length encountered");
 
-        FileUtils.skipBytesFully(in, length);
+        in.skipBytesFully(length);
     }
 
     /* @return An unsigned short in an integer. */
@@ -387,10 +387,10 @@ public class ByteBufferUtil
      * @return null
      * @throws IOException if an I/O error occurs.
      */
-    public static void skipShortLength(DataInput in) throws IOException
+    public static void skipShortLength(DataInputPlus in) throws IOException
     {
         int skip = readShortLength(in);
-        FileUtils.skipBytesFully(in, skip);
+        in.skipBytesFully(skip);
     }
 
     public static ByteBuffer read(DataInput in, int length) throws IOException