You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2013/09/29 05:46:33 UTC

git commit: KNOX-157: Alternate implementation of HttpEntity supporting optimistically repeatability.

Updated Branches:
  refs/heads/master f7d10cde8 -> 17f278785


KNOX-157: Alternate implementation of HttpEntity supporting optimistically repeatability.


Project: http://git-wip-us.apache.org/repos/asf/incubator-knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-knox/commit/17f27878
Tree: http://git-wip-us.apache.org/repos/asf/incubator-knox/tree/17f27878
Diff: http://git-wip-us.apache.org/repos/asf/incubator-knox/diff/17f27878

Branch: refs/heads/master
Commit: 17f2787850682e727babfe368031c11c699f29a4
Parents: f7d10cd
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Sat Sep 28 23:46:14 2013 -0400
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Sat Sep 28 23:46:14 2013 -0400

----------------------------------------------------------------------
 .../dispatch/PartiallyRepeatableHttpEntity.java | 162 ++++
 .../PartiallyRepeatableHttpEntityTest.java      | 874 +++++++++++++++++++
 2 files changed, 1036 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/17f27878/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java
new file mode 100644
index 0000000..50fe15b
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntity.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.http.entity.HttpEntityWrapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+@NotThreadSafe
+public class PartiallyRepeatableHttpEntity extends HttpEntityWrapper {
+
+  public static final int DEFAULT_BUFFER_SIZE = 4096;
+
+  private int replayWriteIndex;
+  private int replayWriteLimit;
+  private byte[] replayBuffer;
+  private ReplayStream finalStream;
+  private InputStream wrappedStream;
+
+  public PartiallyRepeatableHttpEntity( final HttpEntity entity, int bufferSize ) throws IOException {
+    super( entity );
+    this.wrappedStream = null;
+    this.finalStream = null;
+    this.replayWriteIndex = -1;
+    if( !entity.isRepeatable() ) {
+      this.replayBuffer = new byte[ bufferSize ];
+      this.replayWriteLimit = bufferSize-1;
+    } else {
+      this.replayBuffer = null;
+    }
+  }
+
+  public PartiallyRepeatableHttpEntity( final HttpEntity entity ) throws IOException {
+    this( entity, DEFAULT_BUFFER_SIZE );
+  }
+
+  @Override
+  public boolean isRepeatable() {
+    return true;
+  }
+
+  @Override
+  public boolean isStreaming() {
+    return wrappedEntity.isStreaming();
+  }
+
+  @Override
+  public boolean isChunked() {
+    return wrappedEntity.isChunked();
+  }
+
+  @Override
+  public long getContentLength() {
+    return wrappedEntity.getContentLength();
+  }
+
+  // This will throw an IOException if an attempt is made to getContent a second time after
+  // more bytes than the buffer can hold has been read on the first stream.
+  @Override
+  public InputStream getContent() throws IOException {
+    // If the wrapped stream is repeatable return it directly.
+    if( replayBuffer == null ) {
+      return wrappedEntity.getContent();
+    // Else if the buffer has overflowed
+    } else if( finalStream != null ) {
+      throw new IOException( "TODO - Existing stream already past replay buffer capacity" );
+    } else {
+      if( wrappedStream == null ) {
+         wrappedStream = wrappedEntity.getContent();
+      }
+      return new ReplayStream();
+    }
+  }
+
+  @Override
+  public void writeTo( final OutputStream stream ) throws IOException {
+    IOUtils.copy( getContent(), stream );
+  }
+
+  @Override
+  public void consumeContent() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  private class ReplayStream extends InputStream {
+
+    private int replayReadIndex = -1;
+
+    @Override
+    public int read() throws IOException {
+      int b;
+      if( finalStream != null && finalStream != this ) {
+        throw new IOException( "TODO - Replay stream taken over by another consumer." );
+      }
+      // If we can read from the buffer do so.
+      if( replayReadIndex < replayWriteIndex ) {
+        b = replayBuffer[ ++replayReadIndex ];
+      } else {
+        b = wrappedStream.read();
+        // If the underlying stream is not closed.
+        if( b > -1 ) {
+          if( replayWriteIndex < replayWriteLimit ) {
+            replayBuffer[ ++replayWriteIndex ] = (byte)b;
+            replayReadIndex++;
+          } else {
+            finalStream = this;
+          }
+        }
+      }
+      return b;
+    }
+
+    public int read( byte buffer[], int offset, int limit ) throws IOException {
+      int count = -1;
+      if( finalStream != null && finalStream != this ) {
+        throw new IOException( "TODO - Replay stream taken over by another consumer." );
+      }
+      // If we can read from the buffer do so.
+      if( replayReadIndex < replayWriteIndex ) {
+        count = replayWriteIndex - replayReadIndex;
+        count = Math.min( limit, count );
+        System.arraycopy( replayBuffer, replayReadIndex+1, buffer, offset, count );
+        replayReadIndex += count;
+      } else {
+        count = wrappedStream.read( buffer, offset, limit );
+        // If the underlying stream is not closed.
+        if( count > -1 ) {
+          if( replayWriteIndex+count < replayWriteLimit ) {
+            System.arraycopy( buffer, offset, replayBuffer, replayWriteIndex+1, count );
+            replayReadIndex += count;
+            replayWriteIndex += count;
+          } else {
+            finalStream = this;
+          }
+        }
+      }
+      return count;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/17f27878/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java
new file mode 100644
index 0000000..42528bb
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/PartiallyRepeatableHttpEntityTest.java
@@ -0,0 +1,874 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+
+public class PartiallyRepeatableHttpEntityTest {
+
+  private static Charset UTF8 = Charset.forName( "UTF-8" );
+
+  // Variables
+  // Consumers: C1, C2
+  // Reads: FC - Full Content, PC - Partial Content, AC - Any Content
+  // Reads: IB - In Buffer, OB - Overflow Buffer
+  // Close: XC
+  // Expect: EE
+
+  // Test Cases
+  // C1 FC
+  //   C1 FC/IB.
+  //   C1 FC/OB.
+  //   C1 FC/IB; C2 FC.
+  //   C1 FC/OB; C2 AC; EE
+  //   C1 FC/IB; C1 XC; C2 FC.
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  // C1 PC
+  //   C1 PC/IB.
+  //   C1 PC/OB.
+  //   C1 PC/IB; C2 FC.
+  //   C1 PC/OB; C2 AC; EE
+  //   C1 PC/IB; C1 XC; C2 FC.
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  // C1 C2 C1
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+
+  @Test
+  public void testS__C1_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB__C1_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS__C1_FC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB__C1_FC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS_C1_FC_IB__C2_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB_C1_FC_IB__C2_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS_C1_FC_OB__C2_AC__EE() throws Exception {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testB_C1_FC_OB__C2_AC__EE() throws Exception {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 FC/IB; C1 XC; C2 FC.
+  @Test
+  public void testS_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 FC/IB; C1 XC; C2 FC.
+  @Test
+  public void testB_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testS_C1_FC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testB_C1_FC_OB__C1_XC__C2_AC_EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB.
+  @Test
+  public void testS_C1_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 3 );
+    assertThat( text, is( "012" ) );
+  }
+
+  //   C1 PC/IB.
+  @Test
+  public void testB_C1_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 3, 3 );
+    assertThat( text, is( "012" ) );
+  }
+
+  //   C1 PC/OB.
+  @Test
+  public void testS_C1_PC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+  }
+
+  //   C1 PC/OB.
+  @Test
+  public void testB_C1_PC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 4 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+  }
+
+  //   C1 PC/IB; C2 FC.
+  @Test
+  public void testS_C1_PC_IB__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 4 );
+    assertThat( text, is( "0123" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/IB; C2 FC.
+  @Test
+  public void testB_C1_PC_IB__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 4, 1 );
+    assertThat( text, is( "0123" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 7 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/OB; C2 AC; EE
+  @Test
+  public void testS_C1_PC_OB__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/OB; C2 AC; EE
+  @Test
+  public void testB_C1_PC_OB__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C1 XC; C2 FC.
+  @Test
+  public void testS_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/IB; C1 XC; C2 FC.
+  @Test
+  public void testB_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 7 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testS_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testB_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    try {
+      replay.getContent();
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  @Test
+  public void testS_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+
+    stream1 = replay.getContent();
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = byteRead( stream2, 4 );
+    assertThat( text, is( "0123" ) );
+
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "345" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  @Test
+  public void testB_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 20 );
+    stream1 = replay.getContent();
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = blockRead( stream2, UTF8, 4, 3 );
+    assertThat( text, is( "0123" ) );
+
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "345" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+  @Test
+  public void testS_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream1 = replay.getContent();
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = byteRead( stream2, 6 );
+    assertThat( text, is( "012345" ) );
+
+    try {
+      byteRead( stream1, 1 );
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+  @Test
+  public void testB_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    stream1 = replay.getContent();
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = blockRead( stream2, UTF8, 6, 4 );
+    assertThat( text, is( "012345" ) );
+
+    try {
+      blockRead( stream1, UTF8, 6, 4 );
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testWriteTo() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    replay.writeTo( buffer );
+    String output = new String( buffer.toByteArray(), UTF8 );
+    assertThat( output, is( input ) );
+  }
+
+  @Test
+  public void testIsRepeatable() throws Exception {
+    String text = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic );
+    assertThat( replay.isRepeatable(), is( true ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+    BufferedHttpEntity buffered = new BufferedHttpEntity( basic );
+    replay = new PartiallyRepeatableHttpEntity( buffered );
+    assertThat( replay.isRepeatable(), is( true ) );
+  }
+
+  @Test
+  public void testIsChunked() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isChunked(), is( false ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setChunked( true );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isChunked(), is( true ) );
+  }
+
+  @Test
+  public void testGetContentLength() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentLength(), is( -1L ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentLength( input.length() );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentLength(), is( 10L ) );
+  }
+
+  @Test
+  public void testGetContentType() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentType(), nullValue() );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentType( ContentType.APPLICATION_JSON.getMimeType() );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentType().getValue(), is( "application/json" ) );
+  }
+
+  @Test
+  public void testGetContentEncoding() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentEncoding(), nullValue() );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentEncoding( "UTF-8" );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.getContentEncoding().getValue(), is( "UTF-8" ) );
+  }
+
+  @Test
+  public void testIsStreaming() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    InputStreamEntity streaming;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isStreaming(), is( true ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( null );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+    assertThat( replay.isStreaming(), is( false ) );
+
+    streaming = new InputStreamEntity( new ByteArrayInputStream( input.getBytes( UTF8 ) ), 10, ContentType.TEXT_PLAIN );
+    replay = new PartiallyRepeatableHttpEntity( streaming, 5 );
+    assertThat( replay.isStreaming(), is( true ) );
+  }
+
+  @Test
+  public void testConsumeContent() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    PartiallyRepeatableHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new PartiallyRepeatableHttpEntity( basic, 5 );
+
+    try {
+      replay.consumeContent();
+      fail( "Expected UnsupportedOperationException" );
+    } catch ( UnsupportedOperationException e ) {
+      // Expected.
+    }
+  }
+
+  private static String byteRead( InputStream stream, int total ) throws IOException {
+    StringBuilder string = null;
+    int c = 0;
+    if( total < 0 ) {
+      total = Integer.MAX_VALUE;
+    }
+    while( total > 0 && c >= 0 ) {
+      c = stream.read();
+      if( c >= 0 ) {
+        total--;
+        if( string == null ) {
+          string = new StringBuilder();
+        }
+        string.append( (char)c );
+      }
+    }
+    return string == null ? null : string.toString();
+  }
+
+  private static String blockRead( InputStream stream, Charset charset, int total, int chunk ) throws IOException {
+    StringBuilder string = null;
+    byte buffer[] = new byte[ chunk ];
+    int count = 0;
+    if( total < 0 ) {
+      total = Integer.MAX_VALUE;
+    }
+    while( total > 0 && count >= 0 ) {
+      count = stream.read( buffer, 0, Math.min( buffer.length, total ) );
+      if( count >= 0 ) {
+        total -= count;
+        if( string == null ) {
+          string = new StringBuilder();
+        }
+        string.append( new String( buffer, 0, count, charset ) );
+      }
+    }
+    return string == null ? null : string.toString();
+  }
+
+}