You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/22 19:36:26 UTC

[64/83] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
index 0000000,8a7d351..cfc05f2
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ByteBufferInputStream.java
@@@ -1,0 -1,1022 +1,1022 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.tcp;
+ 
+ import java.io.DataInput;
+ import java.io.DataInputStream;
+ import java.io.DataOutput;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.ObjectInput;
+ import java.io.ObjectOutput;
+ import java.lang.reflect.InvocationTargetException;
+ import java.lang.reflect.Method;
+ import java.nio.BufferUnderflowException;
+ import java.nio.ByteBuffer;
+ import java.nio.ByteOrder;
+ 
+ import com.gemstone.gemfire.internal.ByteBufferWriter;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+ 
+ /**
+  * <p>
+  * ByteBufferInputStream is an input stream for ByteBuffer objects. It's
+  * incredible that the jdk doesn't have one of these already.
+  * </p>
+  * 
+  * The methods in this class throw BufferUnderflowException, not EOFException,
+  * if the end of the buffer is reached before we read the full amount. That
+  * breaks the contract for InputStream and DataInput, but it works for our code.
+  * 
+  * @author Dan Smith
+  * @author Bruce Schuchardt
+  * @author Darrel Schneider
+  * @since 3.0
+  */
+ 
+ public class ByteBufferInputStream extends InputStream implements DataInput, java.io.Externalizable
+ {
+   /**
+    * This interface is used to wrap either a ByteBuffer or an offheap Chunk
+    * as the source of bytes for a ByteBufferInputStream.
+    * 
+    * @author dschneider
+    *
+    */
+   public static interface ByteSource {
+     int position();
+     int limit();
+     int capacity();
+     int remaining();
+ 
+     void position(int newPosition);
+     void limit(int endOffset);
+ 
+     void get(byte[] b);
+     void get(byte[] b, int off, int len);
+     byte get();
+     byte get(int pos);
+     short getShort();
+     short getShort(int pos);
+     char getChar();
+     char getChar(int pos);
+     int getInt();
+     int getInt(int pos);
+     long getLong();
+     long getLong(int pos);
+     float getFloat();
+     float getFloat(int pos);
+     double getDouble();
+     double getDouble(int pos);
+ 
+     boolean hasArray();
+     byte[] array();
+     int arrayOffset();
+ 
+     ByteSource duplicate();
+     ByteSource slice(int length);
+     ByteSource slice(int pos, int limit);
+     
+     /**
+      * Returns the ByteBuffer that this ByteSource wraps; null if no ByteBuffer
+      */
+     ByteBuffer getBackingByteBuffer();
+ 
+     void sendTo(ByteBuffer out);
+     void sendTo(DataOutput out) throws IOException;
+   }
+   
+   public static class ByteSourceFactory {
+     public static ByteSource wrap(byte[] bytes) {
+       return new ByteBufferByteSource(ByteBuffer.wrap(bytes));
+     }
+     public static ByteSource create(ByteBuffer bb) {
+       return new ByteBufferByteSource(bb);
+     }
 -    public static ByteSource create(Chunk chunk) {
++    public static ByteSource create(ObjectChunk chunk) {
+       // Since I found a way to create a DirectByteBuffer (using reflection) from a Chunk
+       // we might not even need the ByteSource abstraction any more.
+       // But it is possible that createByteBuffer will not work on a different jdk so keep it for now.
+       ByteBuffer bb = chunk.createDirectByteBuffer();
+       if (bb != null) {
+         return create(bb);
+       } else {
+         return new OffHeapByteSource(chunk);
+       }
+     }
+   }
+   
+   public static class ByteBufferByteSource implements ByteSource {
+     private final ByteBuffer bb;
+     public ByteBufferByteSource(ByteBuffer bb) {
+       this.bb = bb;
+     }
+     /**
+      * Returns the current hash code of this byte source.
+      *
+      * <p> The hash code of a byte source depends only upon its remaining
+      * elements; that is, upon the elements from <tt>position()</tt> up to, and
+      * including, the element at <tt>limit()</tt>&nbsp;-&nbsp;<tt>1</tt>.
+      *
+      * <p> Because byte source hash codes are content-dependent, it is inadvisable
+      * to use byte sources as keys in hash maps or similar data structures unless it
+      * is known that their contents will not change.  </p>
+      *
+      * @return  The current hash code of this byte source
+      */
+     @Override
+     public int hashCode() {
+       int h = 1;
+       int p = position();
+       for (int i = limit() - 1; i >= p; i--) {
+         h = 31 * h + (int)get(i);
+       }
+       return h;
+     }
+     @Override
+     public boolean equals(Object ob) {
+       if (this == ob) {
+         return true;
+       }
+       if (!(ob instanceof ByteSource)) {
+         return false;
+       }
+       ByteSource that = (ByteSource)ob;
+       if (this.remaining() != that.remaining()) {
+         return false;
+       }
+       int p = this.position();
+       for (int i = this.limit() - 1, j = that.limit() - 1; i >= p; i--, j--) {
+         if (this.get(i) != that.get(j)) {
+           return false;
+         }
+       }
+       return true;
+     }
+ 
+     @Override
+     public ByteSource duplicate() {
+       return ByteSourceFactory.create(this.bb.duplicate());
+     }
+     @Override
+     public byte get() {
+       return this.bb.get();
+     }
+     @Override
+     public void get(byte[] b, int off, int len) {
+       this.bb.get(b, off, len);
+     }
+     @Override
+     public int remaining() {
+       return this.bb.remaining();
+     }
+     @Override
+     public int position() {
+       return this.bb.position();
+     }
+     @Override
+     public byte get(int pos) {
+       return this.bb.get(pos);
+     }
+     @Override
+     public char getChar() {
+       return this.bb.getChar();
+     }
+     @Override
+     public char getChar(int pos) {
+       return this.bb.getChar(pos);
+     }
+     @Override
+     public double getDouble() {
+       return this.bb.getDouble();
+     }
+     @Override
+     public double getDouble(int pos) {
+       return this.bb.getDouble(pos);
+     }
+     @Override
+     public float getFloat() {
+       return this.bb.getFloat();
+     }
+     @Override
+     public float getFloat(int pos) {
+       return this.bb.getFloat(pos);
+     }
+     @Override
+     public void get(byte[] b) {
+       this.bb.get(b);
+     }
+     @Override
+     public int getInt() {
+       return this.bb.getInt();
+     }
+     @Override
+     public int getInt(int pos) {
+       return this.bb.getInt(pos);
+     }
+     @Override
+     public long getLong() {
+       return this.bb.getLong();
+     }
+     @Override
+     public long getLong(int pos) {
+       return this.bb.getLong(pos);
+     }
+     @Override
+     public short getShort() {
+       return this.bb.getShort();
+     }
+     @Override
+     public short getShort(int pos) {
+       return this.bb.getShort(pos);
+     }
+     @Override
+     public int limit() {
+       return this.bb.limit();
+     }
+     @Override
+     public void position(int newPosition) {
+       this.bb.position(newPosition);
+     }
+     @Override
+     public boolean hasArray() {
+       return this.bb.hasArray();
+     }
+     @Override
+     public byte[] array() {
+       return this.bb.array();
+     }
+     @Override
+     public int arrayOffset() {
+       return this.bb.arrayOffset();
+     }
+     @Override
+     public void limit(int endOffset) {
+       this.bb.limit(endOffset);
+     }
+     @Override
+     public ByteSource slice(int length) {
+       if (length < 0) {
+         throw new IllegalArgumentException();
+       }
+       ByteBuffer dup = this.bb.duplicate();
+       dup.limit(dup.position() + length);
+       return ByteSourceFactory.create(dup.slice());
+     }
+     @Override
+     public ByteSource slice(int pos, int limit) {
+       ByteBuffer dup = this.bb.duplicate();
+       dup.limit(limit);
+       dup.position(pos);
+       return ByteSourceFactory.create(dup.slice());
+     }
+     @Override
+     public int capacity() {
+       return this.bb.capacity();
+     }
+     @Override
+     public void sendTo(ByteBuffer out) {
+       out.put(this.bb);
+     }
+     @Override
+     public void sendTo(DataOutput out) throws IOException {
+       int len = remaining();
+       if (len == 0) return;
+       if (out instanceof ByteBufferWriter) {
+         ((ByteBufferWriter) out).write(this.bb);
+         return;
+       }
+       if (this.bb.hasArray()) {
+         byte[] bytes = this.bb.array();
+         int offset = this.bb.arrayOffset() + this.bb.position();
+         out.write(bytes, offset, len);
+         this.bb.position(this.bb.limit());
+       } else {
+         while (len > 0) {
+           out.writeByte(get());
+           len--;
+         }
+       }
+     }
+     @Override
+     public ByteBuffer getBackingByteBuffer() {
+       return this.bb;
+     }
+   }
+   
+   public static class OffHeapByteSource implements ByteSource {
+     private int position;
+     private int limit;
 -    private final Chunk chunk;
++    private final ObjectChunk chunk;
+ 
 -    public OffHeapByteSource(Chunk c) {
++    public OffHeapByteSource(ObjectChunk c) {
+       this.chunk = c;
+       this.position = 0;
+       this.limit = capacity();
+     }
+     private OffHeapByteSource(OffHeapByteSource other) {
+       this.chunk = other.chunk;
+       this.position = other.position;
+       this.limit = other.limit;
+     }
+     
+     /**
+      * Returns the current hash code of this byte source.
+      *
+      * <p> The hash code of a byte source depends only upon its remaining
+      * elements; that is, upon the elements from <tt>position()</tt> up to, and
+      * including, the element at <tt>limit()</tt>&nbsp;-&nbsp;<tt>1</tt>.
+      *
+      * <p> Because byte source hash codes are content-dependent, it is inadvisable
+      * to use byte sources as keys in hash maps or similar data structures unless it
+      * is known that their contents will not change.  </p>
+      *
+      * @return  The current hash code of this byte source
+      */
+     @Override
+     public int hashCode() {
+       int h = 1;
+       int p = position();
+       for (int i = limit() - 1; i >= p; i--) {
+         h = 31 * h + (int)get(i);
+       }
+       return h;
+     }
+      
+     @Override
+     public boolean equals(Object ob) {
+       if (this == ob) {
+         return true;
+       }
+       if (!(ob instanceof ByteSource)) {
+         return false;
+       }
+       ByteSource that = (ByteSource)ob;
+       if (this.remaining() != that.remaining()) {
+         return false;
+       }
+       int p = this.position();
+       for (int i = this.limit() - 1, j = that.limit() - 1; i >= p; i--, j--) {
+         if (this.get(i) != that.get(j)) {
+           return false;
+         }
+       }
+       return true;
+     }
+     
+     @Override
+     public int remaining() {
+       return this.limit - this.position;
+     }
+ 
+     @Override
+     public int position() {
+       return this.position;
+     }
+ 
+     @Override
+     public int limit() {
+       return this.limit;
+     }
+ 
+     @Override
+     public void position(int newPosition) {
+       if ((newPosition > this.limit) || (newPosition < 0)) {
+         throw new IllegalArgumentException();
+       }
+       this.position = newPosition;
+     }
+     
+     @Override
+     public void limit(int newLimit) {
+       if ((newLimit > capacity()) || (newLimit < 0)) {
+         throw new IllegalArgumentException();
+       }
+       this.limit = newLimit;
+       if (this.position > this.limit) {
+         this.position = this.limit;
+       }
+     }
+     
+     @Override
+     public int capacity() {
+       return this.chunk.getDataSize();
+     }
+ 
+     private final int nextGetIndex() {
+       int p = this.position;
+       if (p >= this.limit) {
+         throw new BufferUnderflowException();
+       }
+       this.position += 1;
+       return p;
+     }
+ 
+     private final int nextGetIndex(int nb) {
+       int p = this.position;
+       if (this.limit - p < nb) {
+         throw new BufferUnderflowException();
+       }
+       this.position += nb;
+       return p;
+     }
+ 
+     /**
+      * Checks the given index against the limit, throwing an {@link
+      * IndexOutOfBoundsException} if it is not smaller than the limit
+      * or is smaller than zero.
+      */
+     private final void checkIndex(int i) {
+       if ((i < 0) || (i >= this.limit)) {
+         throw new IndexOutOfBoundsException();
+       }
+     }
+ 
+     private final void checkIndex(int i, int nb) {
+       if ((i < 0) || (nb > this.limit - i)) {
+         throw new IndexOutOfBoundsException();
+       }
+     }
+     private static void checkBounds(int off, int len, int size) {
+       if ((off | len | (off + len) | (size - (off + len))) < 0) {
+         throw new IndexOutOfBoundsException();
+       }
+     }
+     
+     @Override
+     public void get(byte[] b) {
+       basicGet(b, 0, b.length);
+     }
+     @Override
+     public void get(byte[] dst, int offset, int length) {
+       checkBounds(offset, length, dst.length);
+       basicGet(dst, offset, length);
+     }
+     private void basicGet(byte[] dst, int offset, int length) {
+       if (length > remaining()) {
+         throw new BufferUnderflowException();
+       }
+       int p = this.position;
+       this.position += length;
+       this.chunk.readBytes(p, dst, offset, length);
+     }
+ 
+     @Override
+     public byte get() {
+       return this.chunk.readByte(nextGetIndex());
+     }
+     @Override
+     public byte get(int pos) {
+       checkIndex(pos);
+       return this.chunk.readByte(pos);
+     }
+ 
+     /**
+      * Return true if the hardware supported unaligned reads from memory.
+      */
+     private static boolean determineUnaligned() {
+       try {
+         Class c = Class.forName("java.nio.Bits");
+         Method m = c.getDeclaredMethod("unaligned");
+         m.setAccessible(true);
+         return (boolean) m.invoke(null);
+       } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+         return false;
+         //throw new IllegalStateException("Could not invoke java.nio.Bits.unaligned()", e);
+       }
+     }
+     private static final boolean unaligned = determineUnaligned();
+     
+     @Override
+     public short getShort() {
+       return basicGetShort(this.nextGetIndex(2));
+     }
+     @Override
+     public short getShort(int pos) {
+       this.checkIndex(pos, 2);
+       return basicGetShort(pos);
+     }
+     private short basicGetShort(int pos) {
+       long addr = this.chunk.getAddressForReading(pos, 2);
+       if (unaligned) {
+         short result = UnsafeMemoryChunk.readAbsoluteShort(addr);
+         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
+           result = Short.reverseBytes(result);
+         }
+         return result;
+       } else {
+         int ch1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         int ch2 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+         return (short)((ch1 << 8) + (ch2 << 0));
+       }
+     }
+ 
+     @Override
+     public char getChar() {
+       return basicGetChar(this.nextGetIndex(2));
+     }
+     @Override
+     public char getChar(int pos) {
+       this.checkIndex(pos, 2);
+       return basicGetChar(pos);
+     }
+     private char basicGetChar(int pos) {
+       long addr = this.chunk.getAddressForReading(pos, 2);
+       if (unaligned) {
+         char result = UnsafeMemoryChunk.readAbsoluteChar(addr);
+         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
+           result = Character.reverseBytes(result);
+         }
+         return result;
+       } else {
+         int ch1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         int ch2 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+         return (char)((ch1 << 8) + (ch2 << 0));
+       }
+     }
+ 
+     @Override
+     public int getInt() {
+       return basicGetInt(this.nextGetIndex(4));
+     }
+     @Override
+     public int getInt(int pos) {
+       this.checkIndex(pos, 4);
+       return basicGetInt(pos);
+     }
+     
+     private int basicGetInt(final int pos) {
+       long addr = this.chunk.getAddressForReading(pos, 4);
+       if (unaligned) {
+         int result = UnsafeMemoryChunk.readAbsoluteInt(addr);
+         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
+           result = Integer.reverseBytes(result);
+         }
+         return result;
+       } else {
+         byte b0 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b2 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b3 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+         return (b0 << 24) + ((b1 & 255) << 16) + ((b2 & 255) << 8) + ((b3 & 255) << 0);
+       }
+     }
+ 
+     @Override
+     public long getLong() {
+       return basicGetLong(this.nextGetIndex(8));
+     }
+     @Override
+     public long getLong(int pos) {
+       this.checkIndex(pos, 8);
+       return basicGetLong(pos);
+     }
+     private long basicGetLong(final int pos) {
+       long addr = this.chunk.getAddressForReading(pos, 8);
+       if (unaligned) {
+         long result = UnsafeMemoryChunk.readAbsoluteLong(addr);
+         if (ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN) {
+           result = Long.reverseBytes(result);
+         }
+         return result;
+       } else {
+         byte b0 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b1 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b2 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b3 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b4 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b5 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b6 = UnsafeMemoryChunk.readAbsoluteByte(addr++);
+         byte b7 = UnsafeMemoryChunk.readAbsoluteByte(addr);
+         return (((long)b0 << 56) +
+             ((long)(b1 & 255) << 48) +
+             ((long)(b2 & 255) << 40) +
+             ((long)(b3 & 255) << 32) +
+             ((long)(b4 & 255) << 24) +
+             ((b5 & 255) << 16) +
+             ((b6 & 255) <<  8) +
+             ((b7 & 255) <<  0));
+       }
+     }
+ 
+     @Override
+     public float getFloat() {
+       return basicGetFloat(this.nextGetIndex(4));
+     }
+     @Override
+     public float getFloat(int pos) {
+       this.checkIndex(pos, 4);
+       return basicGetFloat(pos);
+    }
+     private float basicGetFloat(int pos) {
+       return Float.intBitsToFloat(basicGetInt(pos));
+     }
+ 
+     @Override
+     public double getDouble() {
+       return basicGetDouble(this.nextGetIndex(8));
+     }
+     @Override
+     public double getDouble(int pos) {
+       this.checkIndex(pos, 8);
+       return basicGetDouble(pos);
+     }
+     private double basicGetDouble(int pos) {
+       return Double.longBitsToDouble(basicGetLong(pos));
+     }
+ 
+     @Override
+     public boolean hasArray() {
+       return false;
+     }
+ 
+     @Override
+     public byte[] array() {
+       throw new UnsupportedOperationException();
+     }
+ 
+     @Override
+     public int arrayOffset() {
+       throw new UnsupportedOperationException();
+     }
+ 
+     @Override
+     public ByteSource duplicate() {
+       return new OffHeapByteSource(this);
+     }
+ 
+     @Override
+     public ByteSource slice(int length) {
+       if (length < 0) {
+         throw new IllegalArgumentException();
+       }
+       return slice(this.position, this.position + length);
+     }
+ 
+     @Override
+     public ByteSource slice(int pos, int limit) {
+       if ((limit > capacity()) || (limit < 0)) {
+         throw new IllegalArgumentException();
+       }
+       if ((pos > limit) || (pos < 0)) {
+         throw new IllegalArgumentException();
+       }
+       return new OffHeapByteSource(this.chunk.slice(pos, limit));
+     }
+ 
+     @Override
+     public void sendTo(ByteBuffer out) {
+       int len = remaining();
+       while (len > 0) {
+         out.put(get());
+         len--;
+       }
+       // We will not even create an instance of this class if createByteBuffer works on this platform.
+ //      if (len > 0) {
+ //        ByteBuffer bb = this.chunk.createByteBuffer();
+ //        bb.position(position());
+ //        bb.limit(limit());
+ //        out.put(bb);
+ //        position(limit());
+ //      }
+     }
+     
+     @Override
+     public void sendTo(DataOutput out) throws IOException {
+       int len = remaining();
+       while (len > 0) {
+         out.writeByte(get());
+         len--;
+       }
+     }
+     @Override
+     public ByteBuffer getBackingByteBuffer() {
+       return null;
+     }
+   }
+   
+   private ByteSource buffer;
+ 
+   public ByteBufferInputStream(ByteBuffer buffer) {
+     setBuffer(buffer);
+   }
+   
+   public ByteBufferInputStream() {
+   }
+   
+   protected ByteBufferInputStream(ByteBufferInputStream copy) {
+     this.buffer = copy.buffer.duplicate();
+   }
+ 
 -  public ByteBufferInputStream(Chunk blob) {
++  public ByteBufferInputStream(ObjectChunk blob) {
+     this.buffer = ByteSourceFactory.create(blob);
+   }
+ 
+   public final void setBuffer(ByteSource buffer) {
+     if(buffer == null) {
+       throw new NullPointerException();
+     }
+     this.buffer = buffer;
+   }
+   
+   public final void setBuffer(ByteBuffer bb) {
+     if (bb == null) {
+       throw new NullPointerException();
+     }
+     setBuffer(ByteSourceFactory.create(bb));
+   }
+   
+   /**
+    * See the InputStream read method for javadocs.
+    * Note that if an attempt
+    * to read past the end of the wrapped ByteBuffer is done this method
+    * throws BufferUnderflowException
+    */
+   @Override
+   public final int read() {
+     return (buffer.get() & 0xff);
+   }
+   
+ 
+   /* this method is not thread safe
+    * See the InputStream read method for javadocs.
+    * Note that if an attempt
+    * to read past the end of the wrapped ByteBuffer is done this method
+    * throws BufferUnderflowException
+    */
+   @Override
+   public final int read(byte b[], int off, int len) {
+     buffer.get(b, off, len);
+     return len;
+   }
+ 
+   @Override
+   public int available() {
+     return this.buffer.remaining();
+   }
+   
+   public int position() {
+     return this.buffer.position();
+   }
+ 
+   // GemFire does not use mark or reset so I changed this class
+   // to just inherit from InputStream which does not support mark/reset.
+   // That way we do not need to add support for them to the new ByteSource class.
+   
+ //  @Override
+ //  public boolean markSupported() {
+ //    return true;
+ //  }
+ //
+ //  @Override
+ //  public void mark(int limit) {
+ //    this.buffer.mark();
+ //  }
+ //
+ //  @Override
+ //  public void reset() {
+ //    this.buffer.reset();
+ //  }
+ 
+   @Override
+   public long skip(long n) throws IOException {
+     if (n <= Integer.MAX_VALUE) {
+       return skipBytes((int) n);
+     } else {
+       return super.skip(n);
+     }
+   }
+ 
+   public boolean readBoolean() {
+     return this.buffer.get() != 0;
+   }
+   public boolean readBoolean(int pos) {
+     return this.buffer.get(pos) != 0;
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readByte()
+    */
+   public byte readByte() {
+     return this.buffer.get();
+   }
+   public byte readByte(int pos) {
+     return this.buffer.get(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readChar()
+    */
+   public char readChar() {
+     return this.buffer.getChar();
+   }
+   public char readChar(int pos) {
+     return this.buffer.getChar(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readDouble()
+    */
+   public double readDouble() {
+     return this.buffer.getDouble();
+   }
+   public double readDouble(int pos) {
+     return this.buffer.getDouble(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readFloat()
+    */
+   public float readFloat() {
+     return this.buffer.getFloat();
+   }
+   public float readFloat(int pos) {
+     return this.buffer.getFloat(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readFully(byte[])
+    */
+   public void readFully(byte[] b) {
+     this.buffer.get(b);
+     
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readFully(byte[], int, int)
+    */
+   public void readFully(byte[] b, int off, int len) {
+     this.buffer.get(b, off, len);
+     
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readInt()
+    */
+   public int readInt() {
+     return this.buffer.getInt();
+   }
+   public int readInt(int pos) {
+     return this.buffer.getInt(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readLine()
+    */
+   public String readLine() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readLong()
+    */
+   public long readLong() {
+     return this.buffer.getLong();
+   }
+   public long readLong(int pos) {
+     return this.buffer.getLong(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readShort()
+    */
+   public short readShort() {
+     return this.buffer.getShort();
+   }
+   public short readShort(int pos) {
+     return this.buffer.getShort(pos);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readUTF()
+    */
+   public String readUTF() throws IOException {
+     return DataInputStream.readUTF(this);
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readUnsignedByte()
+    */
+   public int readUnsignedByte() {
+     return this.buffer.get() & 0xff;
+   }
+   public int readUnsignedByte(int pos) {
+     return this.buffer.get(pos) & 0xff;
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#readUnsignedShort()
+    */
+   public int readUnsignedShort() {
+     return this.buffer.getShort() & 0xffff;
+   }
+   public int readUnsignedShort(int pos) {
+     return this.buffer.getShort(pos) & 0xffff;
+   }
+ 
+   /* (non-Javadoc)
+    * @see java.io.DataInput#skipBytes(int)
+    */
+   public int skipBytes(int n) {
+     int newPosition = this.buffer.position() + n;
+     if(newPosition > this.buffer.limit()) {
+       newPosition = this.buffer.limit();
+       n = newPosition - this.buffer.position();
+     }
+     this.buffer.position(newPosition);
+     return n;
+   }
+ 
+   public int size() {
+     return this.buffer.limit();
+   }
+ 
+   public byte get(int idx) {
+     return this.buffer.get(idx);
+   }
+ 
+   public short getShort(int idx) {
+     return this.buffer.getShort(idx);
+   }
+ 
+   public int getInt(int idx) {
+     return this.buffer.getInt(idx);
+   }
+ 
+   public void position(int absPos) {
+ //    if (absPos < 0) {
+ //      throw new IllegalArgumentException("position was less than zero " + absPos);
+ //    } else if (absPos > this.buffer.limit()) {
+ //      throw new IllegalArgumentException( "position " + absPos + " was greater than the limit " + this.buffer.limit());
+ //    }
+     this.buffer.position(absPos);
+   }
+ 
+   public void sendTo(DataOutput out) throws IOException {
+     this.buffer.position(0);
+     this.buffer.sendTo(out);
+  }
+   
+   public void sendTo(ByteBuffer out) {
+     this.buffer.position(0);
+     this.buffer.sendTo(out);
+  }
+ 
+   public ByteSource slice(int length) {
+     return this.buffer.slice(length);
+   }
+   
+   public ByteSource slice(int startOffset, int endOffset) {
+     return this.buffer.slice(startOffset, endOffset);
+   }
+ 
+   public void writeExternal(ObjectOutput out) throws IOException {
+     out.writeBoolean(this.buffer != null);
+     if (this.buffer != null) {
+       out.writeInt(this.buffer.capacity());
+       out.writeInt(this.buffer.limit());
+       out.writeInt(this.buffer.position());
+       for (int i=0; i < this.buffer.capacity(); i++) {
+         out.write(this.buffer.get(i));
+       }
+     }
+   }
+ 
+   public void readExternal(ObjectInput in) throws IOException,
+       ClassNotFoundException {
+     boolean hasBuffer = in.readBoolean();
+     if (hasBuffer) {
+       int capacity = in.readInt();
+       int limit = in.readInt();
+       int position = in.readInt();
+       byte[] bytes = new byte[capacity];
+       int bytesRead = in.read(bytes);
+       if (bytesRead != capacity) {
+         throw new IOException("Expected to read " + capacity + " bytes but only read " + bytesRead + " bytes.");
+       }
+       setBuffer(ByteBuffer.wrap(bytes, position, limit-position));
+     } else {
+       this.buffer = null;
+     }
+   }
+ 
+   public ByteSource getBuffer() {
+     return buffer;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
index 0000000,52f332f..d632158
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/tcp/ImmutableByteBufferInputStream.java
@@@ -1,0 -1,85 +1,85 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.tcp;
+ 
+ import java.nio.ByteBuffer;
+ 
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ 
+ /**
+  * You should only create an instance of this class if the bytes this buffer reads
+  * will never change. If you want a buffer than can be refilled with other bytes then
+  * create an instance of ByteBufferInputStream instead.
+  * Note that even though this class is immutable the position on its ByteBuffer can change.
+  * 
+  * @author darrel
+  * @since 6.6
+  */
+ public class ImmutableByteBufferInputStream extends ByteBufferInputStream {
+ 
+   /**
+    * Create an immutable input stream by whose contents are the first length
+    * bytes from the given input stream.
+    * @param existing the input stream whose content will go into this stream. Note that this existing stream will be read by this class (a copy is not made) so it should not be changed externally.
+    * @param length the number of bytes to put in this stream
+    */
+   public ImmutableByteBufferInputStream(ByteBufferInputStream existing,
+       int length) {
+     setBuffer(existing.slice(length));
+   }
+   /**
+    * Create an immutable input stream whose contents are the given bytes
+    * @param bytes the content of this stream. Note that this byte array will be read by this class (a copy is not made) so it should not be changed externally.
+    */
+   public ImmutableByteBufferInputStream(byte[] bytes) {
+     setBuffer(ByteBuffer.wrap(bytes));
+   }
+ 
+   /**
+    * Create an immutable input stream whose contents are the given bytes
+    * @param bb the content of this stream. Note that bb will be read by this class (a copy is not made) so it should not be changed externally.
+    */
+   public ImmutableByteBufferInputStream(ByteBuffer bb) {
+     setBuffer(bb.slice());
+   }
+   /**
+    * Create an immutable input stream by copying another. A somewhat shallow copy is made.
+    * @param copy the input stream to copy. Note that this copy stream will be read by this class (a copy is not made) so it should not be changed externally.
+    */
+   public ImmutableByteBufferInputStream(ImmutableByteBufferInputStream copy) {
+     super(copy);
+   }
+   public ImmutableByteBufferInputStream() {
+     // for serialization
+   }
+   
 -  public ImmutableByteBufferInputStream(Chunk blob) {
++  public ImmutableByteBufferInputStream(ObjectChunk blob) {
+     super(blob);
+   }
+   @Override
+   public boolean markSupported() {
+     return false;
+   }
+   @Override
+   public void mark(int limit) {
+     // unsupported but exception thrown by reset
+   }
+   @Override
+   public void reset() {
+     throw new UnsupportedOperationException();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
index 0000000,7a4840e..40015a4
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/util/BlobHelper.java
@@@ -1,0 -1,195 +1,195 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.util;
+ 
+ import java.io.IOException;
+ 
+ import com.gemstone.gemfire.DataSerializer;
+ import com.gemstone.gemfire.distributed.internal.DMStats;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.internal.ByteArrayDataInput;
+ import com.gemstone.gemfire.internal.DSCODE;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.pdx.internal.PdxInputStream;
+ 
+ /**
+  * A "blob" is a serialized representation of an object into a byte[].
+  * BlobHelper provides utility methods for
+  * serializing and deserializing the object.
+  * 
+  * 
+  */
+ 
+ public class BlobHelper {
+ 
+   /**
+    * A blob is a serialized Object. This method serializes the object into a
+    * blob and returns the byte array that contains the blob.
+    */
+   public static byte[] serializeToBlob(Object obj) throws IOException {
+     return serializeToBlob(obj, null);
+   }
+ 
+   /**
+    * A blob is a serialized Object.  This method serializes the
+    * object into a blob and returns the byte array that contains the blob.
+    */
+   public static byte[] serializeToBlob(Object obj, Version version)
+   throws IOException
+   {
+     final long start = startSerialization();
+     HeapDataOutputStream hdos = new HeapDataOutputStream(version);
+     DataSerializer.writeObject(obj, hdos);
+     byte[] result = hdos.toByteArray();
+     endSerialization(start, result.length);
+     return result;
+   }
+ 
+   /**
+    * A blob is a serialized Object.  This method serializes the
+    * object into the given HeapDataOutputStream.
+    */
+   public static void serializeTo(Object obj, HeapDataOutputStream hdos)
+     throws IOException
+   {
+     final int startBytes = hdos.size();
+     final long start = startSerialization();
+     DataSerializer.writeObject(obj, hdos);
+     endSerialization(start, hdos.size()-startBytes);
+   }
+                                                                         
+ 
+ 
+   /**
+    * A blob is a serialized Object.  This method 
+    * returns the deserialized object.
+    */
+   public static Object deserializeBlob(byte[] blob) throws IOException,
+       ClassNotFoundException {
+     return deserializeBlob(blob, null, null);
+   }
+ 
+   /**
+    * A blob is a serialized Object.  This method 
+    * returns the deserialized object.
+    */
+   public static Object deserializeBlob(byte[] blob, Version version,
+       ByteArrayDataInput in) throws IOException, ClassNotFoundException {
+     Object result;
+     final long start = startDeserialization();
+     /*
+     final StaticSystemCallbacks sysCb;
+     if (version != null && (sysCb = GemFireCacheImpl.FactoryStatics
+         .systemCallbacks) != null) {
+       // may need to change serialized shape for SQLFire
+       result = sysCb.fromVersion(blob, true, version, in);
+     }
+     else*/ if (blob.length > 0 && blob[0] == DSCODE.PDX) {
+       // If the first byte of blob indicates a pdx then wrap
+       // blob in a PdxInputStream instead.
+       // This will prevent us from making a copy of the byte[]
+       // every time we deserialize a PdxInstance.
+       PdxInputStream is = new PdxInputStream(blob);
+       result = DataSerializer.readObject(is);
+     } else {
+       // if we have a nested pdx then we want to make a copy
+       // when a PdxInstance is created so that the byte[] will
+       // just have the pdx bytes and not the outer objects bytes.
+       if (in == null) {
+         in = new ByteArrayDataInput();
+       }
+       in.initialize(blob, version);
+       result = DataSerializer.readObject(in);
+     }
+     endDeserialization(start, blob.length);
+     // this causes a small performance drop in d-no-ack performance tests
+ //    if (dis.available() != 0) {
+ //      LogWriterI18n lw = InternalDistributedSystem.getLoggerI18n();
+ //      if (lw != null && lw.warningEnabled()) {
+ //        lw.warning(
+ //            LocalizedStrings.BlobHelper_DESERIALIZATION_OF_A_0_DID_NOT_READ_1_BYTES_THIS_INDICATES_A_LOGIC_ERROR_IN_THE_SERIALIZATION_CODE_FOR_THIS_CLASS,
+ //            new Object[] {((result!=null) ? result.getClass().getName() : "NULL"), Integer.valueOf(dis.available())});   
+ //            
+ //      }
+ //    }
+     return result;
+   }
+ 
+   /**
+    * A blob is a serialized Object.  This method 
+    * returns the deserialized object.
+    * If a PdxInstance is returned then it will refer to Chunk's off-heap memory
+    * with an unretained reference.
+    */
 -  public static @Unretained Object deserializeOffHeapBlob(Chunk blob) throws IOException, ClassNotFoundException {
++  public static @Unretained Object deserializeOffHeapBlob(ObjectChunk blob) throws IOException, ClassNotFoundException {
+     Object result;
+     final long start = startDeserialization();
+     // For both top level and nested pdxs we just want a reference to this off-heap blob.
+     // No copies.
+     // For non-pdx we want a stream that will read directly from the chunk.
+     PdxInputStream is = new PdxInputStream(blob);
+     result = DataSerializer.readObject(is);
+     endDeserialization(start, blob.getDataSize());
+     return result;
+   }
+ 
+   public static Object deserializeBuffer(ByteArrayDataInput in, int numBytes)
+       throws IOException, ClassNotFoundException {
+     final long start = startDeserialization();
+     Object result = DataSerializer.readObject(in);
+     endDeserialization(start, numBytes);
+     return result;
+   }
+ 
+   private static long startSerialization() {
+     long result = 0;
+     DMStats stats = InternalDistributedSystem.getDMStats();
+     if (stats != null) {
+       result = stats.startSerialization();
+     }
+     return result;
+   }
+   
+   private static void endSerialization(long start, int bytes) {
+     DMStats stats = InternalDistributedSystem.getDMStats();
+     if (stats != null) {
+       stats.endSerialization(start, bytes);
+     }
+   }
+ 
+   private static long startDeserialization() {
+     long result = 0;
+     DMStats stats = InternalDistributedSystem.getDMStats();
+     if (stats != null) {
+       result = stats.startDeserialization();
+     }
+     return result;
+   }
+   
+   private static void endDeserialization(long start, int bytes) {
+     DMStats stats = InternalDistributedSystem.getDMStats();
+     if (stats != null) {
+       stats.endDeserialization(start, bytes);
+     }
+   }
+   
+ }