You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/02/22 01:09:28 UTC

[geode] branch feature/GEODE-6414 updated: added a new method to HeapDataOutputStream that supports skipping some of the initial bytes when converting the HeapDataOutputStream to a ByteBuffer. This will get rid of some extra copying when creating a PdxInstance that is larger than 1k.

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch feature/GEODE-6414
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6414 by this push:
     new d593292  added a new method to HeapDataOutputStream that supports skipping some of the initial bytes when converting the HeapDataOutputStream to a ByteBuffer. This will get rid of some extra copying when creating a PdxInstance that is larger than 1k.
d593292 is described below

commit d5932920baf816ca2766a7b7279ed0ec973a09ea
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Feb 21 17:07:27 2019 -0800

    added a new method to HeapDataOutputStream that supports
    skipping some of the initial bytes when converting the
    HeapDataOutputStream to a ByteBuffer.
    This will get rid of some extra copying when creating
    a PdxInstance that is larger than 1k.
---
 .../geode/internal/HeapDataOutputStream.java       | 42 ++++++++---
 .../apache/geode/pdx/internal/PdxOutputStream.java |  4 ++
 .../apache/geode/pdx/internal/PdxWriterImpl.java   | 24 ++-----
 .../geode/internal/HeapDataOutputStreamTest.java   | 35 +++++++++
 .../geode/pdx/internal/PdxWriterImplTest.java      | 83 ----------------------
 5 files changed, 79 insertions(+), 109 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
index 5efddee..4d165b2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
@@ -70,6 +70,7 @@ public class HeapDataOutputStream extends OutputStream
   private Version version;
   private boolean doNotCopy;
 
+  static final int SMALLEST_CHUNK_SIZE = 32;
   private static final int INITIAL_CAPACITY = 1024;
 
   public HeapDataOutputStream(Version version) {
@@ -102,8 +103,8 @@ public class HeapDataOutputStream extends OutputStream
    *        instead referenced.
    */
   public HeapDataOutputStream(int allocSize, Version version, boolean doNotCopy) {
-    if (allocSize < 32) {
-      this.MIN_CHUNK_SIZE = 32;
+    if (allocSize < SMALLEST_CHUNK_SIZE) {
+      this.MIN_CHUNK_SIZE = SMALLEST_CHUNK_SIZE;
     } else {
       this.MIN_CHUNK_SIZE = allocSize;
     }
@@ -340,21 +341,34 @@ public class HeapDataOutputStream extends OutputStream
     if (this.chunks != null) {
       final int size = size();
       ByteBuffer newBuffer = ByteBuffer.allocate(size);
-      int newBufPos = 0;
       for (ByteBuffer bb : this.chunks) {
         newBuffer.put(bb);
-        newBufPos += bb.position();
-        newBuffer.position(newBufPos); // works around JRockit 1.4.2.04 bug
       }
       this.chunks = null;
       newBuffer.put(this.buffer);
-      newBufPos += this.buffer.position();
-      newBuffer.position(newBufPos); // works around JRockit 1.4.2.04 bug
       this.buffer = newBuffer;
       this.buffer.flip(); // now ready for reading
     }
   }
 
+  private void consolidateChunks(int startPosition) {
+    assert startPosition < SMALLEST_CHUNK_SIZE;
+    final int size = size() - startPosition;
+    ByteBuffer newBuffer = ByteBuffer.allocate(size);
+    if (this.chunks != null) {
+      this.chunks.getFirst().position(startPosition);
+      for (ByteBuffer bb : this.chunks) {
+        newBuffer.put(bb);
+      }
+      this.chunks = null;
+    } else {
+      this.buffer.position(startPosition);
+    }
+    newBuffer.put(this.buffer);
+    newBuffer.flip(); // now ready for reading
+    this.buffer = newBuffer;
+  }
+
   /**
    * Prepare the contents for sending again
    */
@@ -425,7 +439,7 @@ public class HeapDataOutputStream extends OutputStream
   }
 
   /**
-   * gets the contents of this stream as s ByteBuffer, ready for reading. The stream should not be
+   * gets the contents of this stream as a ByteBuffer, ready for reading. The stream should not be
    * written to past this point until it has been reset.
    */
   public ByteBuffer toByteBuffer() {
@@ -435,6 +449,18 @@ public class HeapDataOutputStream extends OutputStream
   }
 
   /**
+   * gets the contents of this stream as a ByteBuffer, ready for reading. The stream should not be
+   * written to past this point until it has been reset.
+   *
+   * @param startPosition the position of the first byte to copy into the returned buffer.
+   */
+  public ByteBuffer toByteBuffer(int startPosition) {
+    finishWriting();
+    consolidateChunks(startPosition);
+    return this.buffer;
+  }
+
+  /**
    * gets the contents of this stream as a byte[]. The stream should not be written to past this
    * point until it has been reset.
    */
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java
index c7c1b1b..44d31c2 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java
@@ -220,6 +220,10 @@ public class PdxOutputStream implements ByteBufferWriter {
     return this.hdos.toByteBuffer();
   }
 
+  public ByteBuffer toByteBuffer(int startPosition) {
+    return this.hdos.toByteBuffer(startPosition);
+  }
+
   public byte[] toByteArray() {
     return this.hdos.toByteArray();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java
index 610ef07..ea74414 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java
@@ -946,28 +946,16 @@ public class PdxWriterImpl implements PdxWriter {
   }
 
   PdxInstance makePdxInstance() {
-    ByteBuffer bb = this.os.toByteBuffer();
-    bb.get(); // skip PDX DSCODE
-    int len = bb.getInt();
-    bb.getInt(); // skip PDX type
+    final int DSCODE_SIZE = 1;
+    final int LENGTH_SIZE = 4;
+    final int PDX_TYPE_SIZE = 4;
+    final int BYTES_TO_SKIP = DSCODE_SIZE + LENGTH_SIZE + PDX_TYPE_SIZE;
+    ByteBuffer bb = this.os.toByteBuffer(BYTES_TO_SKIP);
     PdxType pt = this.newType;
     if (pt == null) {
       pt = this.existingType;
     }
-    return new PdxInstanceImpl(pt, new PdxInputStream(copyRemainingBytes(bb)), len);
-  }
-
-  /**
-   * Copies the remaining bytes in source
-   * (that is, from its current position to its limit)
-   * into a new heap ByteBuffer that is returned.
-   */
-  static ByteBuffer copyRemainingBytes(ByteBuffer source) {
-    ByteBuffer slice = source.slice();
-    ByteBuffer result = ByteBuffer.allocate(slice.capacity());
-    result.put(slice);
-    result.flip();
-    return result;
+    return new PdxInstanceImpl(pt, new PdxInputStream(bb), bb.limit());
   }
 
   public static boolean isPdx(byte[] valueBytes) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java b/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java
index 4638fa0..8ef60b1 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java
@@ -14,10 +14,14 @@
  */
 package org.apache.geode.internal;
 
+import static org.apache.geode.internal.HeapDataOutputStream.SMALLEST_CHUNK_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -33,4 +37,35 @@ public class HeapDataOutputStreamTest {
     when(mockHeapDataOutputStream.getVersion()).thenReturn(mockVersion);
     assertThat(mockHeapDataOutputStream.getVersion()).isEqualTo(mockVersion);
   }
+
+  @Test
+  public void toByteBufferWithStartPositionAndNoChunksReturnsCorrectByteBuffer()
+      throws IOException {
+    HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(SMALLEST_CHUNK_SIZE, null);
+    heapDataOutputStream.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
+
+    ByteBuffer result = heapDataOutputStream.toByteBuffer(9);
+
+    assertThat(result.remaining()).isEqualTo(1);
+    assertThat(result.get(0)).isEqualTo((byte) 10);
+  }
+
+  @Test
+  public void toByteBufferWithStartPositionAndChunksReturnsCorrectByteBuffer() throws IOException {
+    HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(SMALLEST_CHUNK_SIZE, null);
+    heapDataOutputStream.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
+    byte[] chunk = new byte[SMALLEST_CHUNK_SIZE];
+    for (byte i = 0; i < SMALLEST_CHUNK_SIZE; i++) {
+      chunk[i] = i;
+    }
+    heapDataOutputStream.write(chunk);
+
+    ByteBuffer result = heapDataOutputStream.toByteBuffer(9);
+
+    assertThat(result.remaining()).isEqualTo(SMALLEST_CHUNK_SIZE + 1);
+    assertThat(result.get(0)).isEqualTo((byte) 10);
+    for (byte i = 0; i < SMALLEST_CHUNK_SIZE; i++) {
+      assertThat(result.get(i + 1)).isEqualTo(i);
+    }
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/internal/PdxWriterImplTest.java b/geode-core/src/test/java/org/apache/geode/pdx/internal/PdxWriterImplTest.java
deleted file mode 100644
index 6a2068f..0000000
--- a/geode-core/src/test/java/org/apache/geode/pdx/internal/PdxWriterImplTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.pdx.internal;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.test.junit.categories.SerializationTest;
-
-@Category({SerializationTest.class})
-public class PdxWriterImplTest {
-
-  @Test
-  public void copyingEmptySourceReturnsEmptyResult() {
-    ByteBuffer source = ByteBuffer.allocate(10);
-    source.limit(0);
-
-    ByteBuffer result = PdxWriterImpl.copyRemainingBytes(source);
-
-    assertThat(source.position()).isEqualTo(0);
-    assertThat(source.limit()).isEqualTo(0);
-    assertThat(result.capacity()).isEqualTo(0);
-    assertThat(result.remaining()).isEqualTo(0);
-  }
-
-  @Test
-  public void copyingFullSourceReturnsFullResult() {
-    ByteBuffer source = ByteBuffer.allocate(10);
-    source.put((byte) 1);
-    source.put((byte) 2);
-    source.put((byte) 3);
-    source.flip();
-
-    ByteBuffer result = PdxWriterImpl.copyRemainingBytes(source);
-
-    assertThat(source.position()).isEqualTo(0);
-    assertThat(source.limit()).isEqualTo(3);
-    assertThat(result.remaining()).isEqualTo(3);
-    assertThat(result.capacity()).isEqualTo(3);
-    assertThat(result.get(0)).isEqualTo((byte) 1);
-    assertThat(result.get(1)).isEqualTo((byte) 2);
-    assertThat(result.get(2)).isEqualTo((byte) 3);
-  }
-
-  @Test
-  public void copyingPartialSourceReturnsPartialResult() {
-    ByteBuffer source = ByteBuffer.allocate(10);
-    source.put((byte) 1);
-    source.put((byte) 2);
-    source.put((byte) 3);
-    source.put((byte) 4);
-    source.position(1);
-    source.limit(3);
-
-    ByteBuffer result = PdxWriterImpl.copyRemainingBytes(source);
-
-    assertThat(source.position()).isEqualTo(1);
-    assertThat(source.limit()).isEqualTo(3);
-    assertThat(result.remaining()).isEqualTo(2);
-    assertThat(result.capacity()).isEqualTo(2);
-    assertThat(result.get(0)).isEqualTo((byte) 2);
-    assertThat(result.get(1)).isEqualTo((byte) 3);
-  }
-
-}