You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2019/02/22 01:09:28 UTC
[geode] branch feature/GEODE-6414 updated: added a new method to
HeapDataOutputStream that supports skipping some of the initial bytes when
converting the HeapDataOutputStream to a ByteBuffer. This will get rid of
some extra copying when creating a PdxInstance that is larger than 1k.
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch feature/GEODE-6414
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6414 by this push:
new d593292 added a new method to HeapDataOutputStream that supports skipping some of the initial bytes when converting the HeapDataOutputStream to a ByteBuffer. This will get rid of some extra copying when creating a PdxInstance that is larger than 1k.
d593292 is described below
commit d5932920baf816ca2766a7b7279ed0ec973a09ea
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Feb 21 17:07:27 2019 -0800
added a new method to HeapDataOutputStream that supports
skipping some of the initial bytes when converting the
HeapDataOutputStream to a ByteBuffer.
This will get rid of some extra copying when creating
a PdxInstance that is larger than 1k.
---
.../geode/internal/HeapDataOutputStream.java | 42 ++++++++---
.../apache/geode/pdx/internal/PdxOutputStream.java | 4 ++
.../apache/geode/pdx/internal/PdxWriterImpl.java | 24 ++-----
.../geode/internal/HeapDataOutputStreamTest.java | 35 +++++++++
.../geode/pdx/internal/PdxWriterImplTest.java | 83 ----------------------
5 files changed, 79 insertions(+), 109 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
index 5efddee..4d165b2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
@@ -70,6 +70,7 @@ public class HeapDataOutputStream extends OutputStream
private Version version;
private boolean doNotCopy;
+ static final int SMALLEST_CHUNK_SIZE = 32;
private static final int INITIAL_CAPACITY = 1024;
public HeapDataOutputStream(Version version) {
@@ -102,8 +103,8 @@ public class HeapDataOutputStream extends OutputStream
* instead referenced.
*/
public HeapDataOutputStream(int allocSize, Version version, boolean doNotCopy) {
- if (allocSize < 32) {
- this.MIN_CHUNK_SIZE = 32;
+ if (allocSize < SMALLEST_CHUNK_SIZE) {
+ this.MIN_CHUNK_SIZE = SMALLEST_CHUNK_SIZE;
} else {
this.MIN_CHUNK_SIZE = allocSize;
}
@@ -340,21 +341,34 @@ public class HeapDataOutputStream extends OutputStream
if (this.chunks != null) {
final int size = size();
ByteBuffer newBuffer = ByteBuffer.allocate(size);
- int newBufPos = 0;
for (ByteBuffer bb : this.chunks) {
newBuffer.put(bb);
- newBufPos += bb.position();
- newBuffer.position(newBufPos); // works around JRockit 1.4.2.04 bug
}
this.chunks = null;
newBuffer.put(this.buffer);
- newBufPos += this.buffer.position();
- newBuffer.position(newBufPos); // works around JRockit 1.4.2.04 bug
this.buffer = newBuffer;
this.buffer.flip(); // now ready for reading
}
}
+ private void consolidateChunks(int startPosition) {
+ assert startPosition < SMALLEST_CHUNK_SIZE;
+ final int size = size() - startPosition;
+ ByteBuffer newBuffer = ByteBuffer.allocate(size);
+ if (this.chunks != null) {
+ this.chunks.getFirst().position(startPosition);
+ for (ByteBuffer bb : this.chunks) {
+ newBuffer.put(bb);
+ }
+ this.chunks = null;
+ } else {
+ this.buffer.position(startPosition);
+ }
+ newBuffer.put(this.buffer);
+ newBuffer.flip(); // now ready for reading
+ this.buffer = newBuffer;
+ }
+
/**
* Prepare the contents for sending again
*/
@@ -425,7 +439,7 @@ public class HeapDataOutputStream extends OutputStream
}
/**
- * gets the contents of this stream as s ByteBuffer, ready for reading. The stream should not be
+ * gets the contents of this stream as a ByteBuffer, ready for reading. The stream should not be
* written to past this point until it has been reset.
*/
public ByteBuffer toByteBuffer() {
@@ -435,6 +449,18 @@ public class HeapDataOutputStream extends OutputStream
}
/**
+ * gets the contents of this stream as a ByteBuffer, ready for reading. The stream should not be
+ * written to past this point until it has been reset.
+ *
+ * @param startPosition the position of the first byte to copy into the returned buffer.
+ */
+ public ByteBuffer toByteBuffer(int startPosition) {
+ finishWriting();
+ consolidateChunks(startPosition);
+ return this.buffer;
+ }
+
+ /**
* gets the contents of this stream as a byte[]. The stream should not be written to past this
* point until it has been reset.
*/
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java
index c7c1b1b..44d31c2 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxOutputStream.java
@@ -220,6 +220,10 @@ public class PdxOutputStream implements ByteBufferWriter {
return this.hdos.toByteBuffer();
}
+ public ByteBuffer toByteBuffer(int startPosition) {
+ return this.hdos.toByteBuffer(startPosition);
+ }
+
public byte[] toByteArray() {
return this.hdos.toByteArray();
}
diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java
index 610ef07..ea74414 100644
--- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PdxWriterImpl.java
@@ -946,28 +946,16 @@ public class PdxWriterImpl implements PdxWriter {
}
PdxInstance makePdxInstance() {
- ByteBuffer bb = this.os.toByteBuffer();
- bb.get(); // skip PDX DSCODE
- int len = bb.getInt();
- bb.getInt(); // skip PDX type
+ final int DSCODE_SIZE = 1;
+ final int LENGTH_SIZE = 4;
+ final int PDX_TYPE_SIZE = 4;
+ final int BYTES_TO_SKIP = DSCODE_SIZE + LENGTH_SIZE + PDX_TYPE_SIZE;
+ ByteBuffer bb = this.os.toByteBuffer(BYTES_TO_SKIP);
PdxType pt = this.newType;
if (pt == null) {
pt = this.existingType;
}
- return new PdxInstanceImpl(pt, new PdxInputStream(copyRemainingBytes(bb)), len);
- }
-
- /**
- * Copies the remaining bytes in source
- * (that is, from its current position to its limit)
- * into a new heap ByteBuffer that is returned.
- */
- static ByteBuffer copyRemainingBytes(ByteBuffer source) {
- ByteBuffer slice = source.slice();
- ByteBuffer result = ByteBuffer.allocate(slice.capacity());
- result.put(slice);
- result.flip();
- return result;
+ return new PdxInstanceImpl(pt, new PdxInputStream(bb), bb.limit());
}
public static boolean isPdx(byte[] valueBytes) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java b/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java
index 4638fa0..8ef60b1 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/HeapDataOutputStreamTest.java
@@ -14,10 +14,14 @@
*/
package org.apache.geode.internal;
+import static org.apache.geode.internal.HeapDataOutputStream.SMALLEST_CHUNK_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -33,4 +37,35 @@ public class HeapDataOutputStreamTest {
when(mockHeapDataOutputStream.getVersion()).thenReturn(mockVersion);
assertThat(mockHeapDataOutputStream.getVersion()).isEqualTo(mockVersion);
}
+
+ @Test
+ public void toByteBufferWithStartPositionAndNoChunksReturnsCorrectByteBuffer()
+ throws IOException {
+ HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(SMALLEST_CHUNK_SIZE, null);
+ heapDataOutputStream.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
+
+ ByteBuffer result = heapDataOutputStream.toByteBuffer(9);
+
+ assertThat(result.remaining()).isEqualTo(1);
+ assertThat(result.get(0)).isEqualTo((byte) 10);
+ }
+
+ @Test
+ public void toByteBufferWithStartPositionAndChunksReturnsCorrectByteBuffer() throws IOException {
+ HeapDataOutputStream heapDataOutputStream = new HeapDataOutputStream(SMALLEST_CHUNK_SIZE, null);
+ heapDataOutputStream.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
+ byte[] chunk = new byte[SMALLEST_CHUNK_SIZE];
+ for (byte i = 0; i < SMALLEST_CHUNK_SIZE; i++) {
+ chunk[i] = i;
+ }
+ heapDataOutputStream.write(chunk);
+
+ ByteBuffer result = heapDataOutputStream.toByteBuffer(9);
+
+ assertThat(result.remaining()).isEqualTo(SMALLEST_CHUNK_SIZE + 1);
+ assertThat(result.get(0)).isEqualTo((byte) 10);
+ for (byte i = 0; i < SMALLEST_CHUNK_SIZE; i++) {
+ assertThat(result.get(i + 1)).isEqualTo(i);
+ }
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/pdx/internal/PdxWriterImplTest.java b/geode-core/src/test/java/org/apache/geode/pdx/internal/PdxWriterImplTest.java
deleted file mode 100644
index 6a2068f..0000000
--- a/geode-core/src/test/java/org/apache/geode/pdx/internal/PdxWriterImplTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.geode.pdx.internal;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.test.junit.categories.SerializationTest;
-
-@Category({SerializationTest.class})
-public class PdxWriterImplTest {
-
- @Test
- public void copyingEmptySourceReturnsEmptyResult() {
- ByteBuffer source = ByteBuffer.allocate(10);
- source.limit(0);
-
- ByteBuffer result = PdxWriterImpl.copyRemainingBytes(source);
-
- assertThat(source.position()).isEqualTo(0);
- assertThat(source.limit()).isEqualTo(0);
- assertThat(result.capacity()).isEqualTo(0);
- assertThat(result.remaining()).isEqualTo(0);
- }
-
- @Test
- public void copyingFullSourceReturnsFullResult() {
- ByteBuffer source = ByteBuffer.allocate(10);
- source.put((byte) 1);
- source.put((byte) 2);
- source.put((byte) 3);
- source.flip();
-
- ByteBuffer result = PdxWriterImpl.copyRemainingBytes(source);
-
- assertThat(source.position()).isEqualTo(0);
- assertThat(source.limit()).isEqualTo(3);
- assertThat(result.remaining()).isEqualTo(3);
- assertThat(result.capacity()).isEqualTo(3);
- assertThat(result.get(0)).isEqualTo((byte) 1);
- assertThat(result.get(1)).isEqualTo((byte) 2);
- assertThat(result.get(2)).isEqualTo((byte) 3);
- }
-
- @Test
- public void copyingPartialSourceReturnsPartialResult() {
- ByteBuffer source = ByteBuffer.allocate(10);
- source.put((byte) 1);
- source.put((byte) 2);
- source.put((byte) 3);
- source.put((byte) 4);
- source.position(1);
- source.limit(3);
-
- ByteBuffer result = PdxWriterImpl.copyRemainingBytes(source);
-
- assertThat(source.position()).isEqualTo(1);
- assertThat(source.limit()).isEqualTo(3);
- assertThat(result.remaining()).isEqualTo(2);
- assertThat(result.capacity()).isEqualTo(2);
- assertThat(result.get(0)).isEqualTo((byte) 2);
- assertThat(result.get(1)).isEqualTo((byte) 3);
- }
-
-}