You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by di...@apache.org on 2013/10/02 19:49:06 UTC

git commit: KNOX-174, upport service specific cap for buffering request entities for replay against WWW-authenticate challenge

Updated Branches:
  refs/heads/v0.3.0 abbe234d1 -> 61d3d15a7


KNOX-174, upport service specific cap for buffering request entities for replay against WWW-authenticate challenge


Project: http://git-wip-us.apache.org/repos/asf/incubator-knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-knox/commit/61d3d15a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-knox/tree/61d3d15a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-knox/diff/61d3d15a

Branch: refs/heads/v0.3.0
Commit: 61d3d15a70255a8beb19270f39fd9d25f81320e9
Parents: abbe234
Author: Dilli Dorai Arumugam <da...@hortonworks.com>
Authored: Wed Oct 2 10:47:05 2013 -0700
Committer: Dilli Dorai Arumugam <da...@hortonworks.com>
Committed: Wed Oct 2 10:47:05 2013 -0700

----------------------------------------------------------------------
 .../impl/DispatchDeploymentContributor.java     |  16 +
 .../dispatch/CappedBufferHttpEntity.java        | 152 ++++
 .../gateway/dispatch/HttpClientDispatch.java    |  49 +-
 .../dispatch/CappedBufferHttpEntityTest.java    | 854 +++++++++++++++++++
 .../dispatch/HttpClientDispatchTest.java        |   5 +-
 .../oozie/OozieDeploymentContributor.java       |  31 +-
 6 files changed, 1083 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
index 91951da..c9189fb 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
@@ -26,9 +26,15 @@ import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
 import org.apache.hadoop.gateway.topology.Provider;
 import org.apache.hadoop.gateway.topology.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 
 public class DispatchDeploymentContributor extends ProviderDeploymentContributorBase {
+  
+  private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+  
+  // Default global replay buffer size in KB
+  public static final String DEFAULT_REPLAY_BUFFER_SIZE = "4";
 
   @Override
   public String getRole() {
@@ -42,7 +48,17 @@ public class DispatchDeploymentContributor extends ProviderDeploymentContributor
 
   @Override
   public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
+    String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
+    if (params != null) {
+      for (FilterParamDescriptor paramDescriptor : params) {
+        if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
+          replayBufferSize = paramDescriptor.value();
+          break;
+        }
+      }
+    }
     FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HttpClientDispatch.class );
+    filter.param().name("replayBufferSize").value(replayBufferSize);
     if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
       filter.param().name("kerberos").value("true");
     }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java
new file mode 100644
index 0000000..d5f7563
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.http.entity.HttpEntityWrapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+@NotThreadSafe
+public class CappedBufferHttpEntity extends HttpEntityWrapper {
+
+  public static final int DEFAULT_BUFFER_SIZE = 4096;
+
+  private int replayWriteIndex;
+  private int replayWriteLimit;
+  private byte[] replayBuffer;
+  private InputStream wrappedStream;
+
+  public CappedBufferHttpEntity( final HttpEntity entity, int bufferSize ) throws IOException {
+    super( entity );
+    this.wrappedStream = null;
+    this.replayWriteIndex = -1;
+    if( !entity.isRepeatable() ) {
+      this.replayBuffer = new byte[ bufferSize ];
+      this.replayWriteLimit = bufferSize-1;
+    } else {
+      this.replayBuffer = null;
+    }
+  }
+
+  public CappedBufferHttpEntity( final HttpEntity entity ) throws IOException {
+    this( entity, DEFAULT_BUFFER_SIZE );
+  }
+
+  @Override
+  public boolean isRepeatable() {
+    return true;
+  }
+
+  @Override
+  public boolean isStreaming() {
+    return wrappedEntity.isStreaming();
+  }
+
+  @Override
+  public boolean isChunked() {
+    return wrappedEntity.isChunked();
+  }
+
+  @Override
+  public long getContentLength() {
+    return wrappedEntity.getContentLength();
+  }
+
+  // This will throw an IOException if an attempt is made to getContent a second time after
+  // more bytes than the buffer can hold has been read on the first stream.
+  @Override
+  public InputStream getContent() throws IOException {
+    // If the wrapped stream is repeatable return it directly.
+    if( replayBuffer == null ) {
+      return wrappedEntity.getContent();
+    // Else if the buffer has overflowed
+    } else {
+      if( wrappedStream == null ) {
+         wrappedStream = wrappedEntity.getContent();
+      }
+      return new ReplayStream();
+    }
+  }
+
+  @Override
+  public void writeTo( final OutputStream stream ) throws IOException {
+    IOUtils.copy( getContent(), stream );
+  }
+
+  @Override
+  public void consumeContent() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  private class ReplayStream extends InputStream {
+
+    private int replayReadIndex = -1;
+
+    @Override
+    public int read() throws IOException {
+      int b;
+      // If we can read from the buffer do so.
+      if( replayReadIndex < replayWriteIndex ) {
+        b = replayBuffer[ ++replayReadIndex ];
+      } else {
+        b = wrappedStream.read();
+        // If the underlying stream is not closed.
+        if( b > -1 ) {
+          if( replayWriteIndex < replayWriteLimit ) {
+            replayBuffer[ ++replayWriteIndex ] = (byte)b;
+            replayReadIndex++;
+          } else {
+            throw new IOException("Hit replay buffer max limit");
+          }
+        }
+      }
+      return b;
+    }
+
+    public int read( byte buffer[], int offset, int limit ) throws IOException {
+      int count = -1;
+      // If we can read from the buffer do so.
+      if( replayReadIndex < replayWriteIndex ) {
+        count = replayWriteIndex - replayReadIndex;
+        count = Math.min( limit, count );
+        System.arraycopy( replayBuffer, replayReadIndex+1, buffer, offset, count );
+        replayReadIndex += count;
+      } else {
+        count = wrappedStream.read( buffer, offset, limit );
+        // If the underlying stream is not closed.
+        if( count > -1 ) {
+          if( replayWriteIndex+count < replayWriteLimit ) {
+            System.arraycopy( buffer, offset, replayBuffer, replayWriteIndex+1, count );
+            replayReadIndex += count;
+            replayWriteIndex += count;
+          } else {
+            throw new IOException("Hit replay buffer max limit");
+          }
+        }
+      }
+      return count;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
index 616630c..546c1d0 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
@@ -17,6 +17,16 @@
  */
 package org.apache.hadoop.gateway.dispatch;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.hadoop.gateway.GatewayMessages;
 import org.apache.hadoop.gateway.GatewayResources;
 import org.apache.hadoop.gateway.config.GatewayConfig;
@@ -33,24 +43,18 @@ import org.apache.http.client.methods.HttpOptions;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.entity.BufferedHttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.entity.InputStreamEntity;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.message.BasicHeader;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-
 /**
  *
  */
 public class HttpClientDispatch extends AbstractGatewayDispatch {
   
+  private static final String REPLAY_BUFFER_SIZE = "replayBufferSize";
+  
   // private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
   // private static final String CT_APP_XML = "application/xml";
   private static final String Q_DELEGATION_EQ = "?delegation=";
@@ -62,9 +66,22 @@ public class HttpClientDispatch extends AbstractGatewayDispatch {
 
   private static GatewayMessages LOG = MessagesFactory.get( GatewayMessages.class );
   private static GatewayResources RES = ResourcesFactory.get( GatewayResources.class );
-  private static final int REPLAY_BUFFER_MAX_SIZE = 1024 * 1024; // limit to 1MB
+  private static final int DEFAULT_REPLAY_BUFFER_SIZE =  4 * 1024; // 4K
 
-  private AppCookieManager appCookieManager = new AppCookieManager();;
+  private AppCookieManager appCookieManager = new AppCookieManager();
+  
+  private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+  
+  private int replayBufferSize = 0;
+  
+  @Override
+  public void init( FilterConfig filterConfig ) throws ServletException {
+    super.init(filterConfig);
+    String replayBufferSizeString = filterConfig.getInitParameter( REPLAY_BUFFER_SIZE_PARAM );
+    if ( replayBufferSizeString != null ) {
+      setReplayBufferSize(Integer.valueOf(replayBufferSizeString));
+    }
+  }
   
   protected void executeRequest(
       HttpUriRequest outboundRequest,
@@ -193,8 +210,8 @@ public class HttpClientDispatch extends AbstractGatewayDispatch {
         delegationTokenPresent = queryString.startsWith("delegation=") || 
             queryString.contains("&delegation=");
       }     
-      if (!delegationTokenPresent) {
-        entity = new PartiallyRepeatableHttpEntity( entity );
+      if (!delegationTokenPresent && getReplayBufferSize() > 0 ) {
+          entity = new CappedBufferHttpEntity( entity, getReplayBufferSize() * 1024 );
       }
     }
 
@@ -246,4 +263,12 @@ public class HttpClientDispatch extends AbstractGatewayDispatch {
     executeRequest( method, request, response );
   }
 
+  int getReplayBufferSize() {
+    return replayBufferSize;
+  }
+  
+  void setReplayBufferSize(int size) {
+    replayBufferSize = size;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java
new file mode 100644
index 0000000..e558ea8
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java
@@ -0,0 +1,854 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+
+public class CappedBufferHttpEntityTest {
+
+  private static Charset UTF8 = Charset.forName( "UTF-8" );
+
+  // Variables
+  // Consumers: C1, C2
+  // Reads: FC - Full Content, PC - Partial Content, AC - Any Content
+  // Reads: IB - In Buffer, OB - Overflow Buffer
+  // Close: XC
+  // Expect: EE
+
+  // Test Cases
+  // C1 FC
+  //   C1 FC/IB.
+  //   C1 FC/OB.
+  //   C1 FC/IB; C2 FC.
+  //   C1 FC/OB; C2 AC; EE
+  //   C1 FC/IB; C1 XC; C2 FC.
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  // C1 PC
+  //   C1 PC/IB.
+  //   C1 PC/OB.
+  //   C1 PC/IB; C2 FC.
+  //   C1 PC/OB; C2 AC; EE
+  //   C1 PC/IB; C1 XC; C2 FC.
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  // C1 C2 C1
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+
+  @Test
+  public void testS__C1_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB__C1_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS__C1_FC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    String output;
+
+    try {
+      output = byteRead( replay.getContent(), -1 );
+      fail("expected IOException");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testB__C1_FC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    String output;
+
+    try {
+      output = blockRead( replay.getContent(), UTF8, -1, 3 );
+      fail("expected IOException");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testS_C1_FC_IB__C2_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    String output;
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+
+    output = byteRead( replay.getContent(), -1 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testB_C1_FC_IB__C2_FC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    String output;
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+
+    output = blockRead( replay.getContent(), UTF8, -1, 3 );
+    assertThat( output, is( data ) );
+  }
+
+  @Test
+  public void testS_C1_FC_OB__C2_AC__EE() throws Exception {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    String output;
+
+    try {
+      output = byteRead( replay.getContent(), -1 );
+    fail( "Expected IOException" );
+    } catch( IOException e ) {
+     // Expected.
+   }
+ 
+  }
+
+  @Test
+  public void testB_C1_FC_OB__C2_AC__EE() throws Exception {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    String output;
+    try {
+      output = blockRead( replay.getContent(), UTF8, -1, 3 );
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 FC/IB; C1 XC; C2 FC.
+  @Test
+  public void testS_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 FC/IB; C1 XC; C2 FC.
+  @Test
+  public void testB_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 3 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testS_C1_FC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    try {
+      text = byteRead( stream, -1 );
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 FC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testB_C1_FC_OB__C1_XC__C2_AC_EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    try {
+      text = blockRead( stream, UTF8, -1, 3 );
+      fail( "Expected IOException" );
+    } catch( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB.
+  @Test
+  public void testS_C1_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 3 );
+    assertThat( text, is( "012" ) );
+  }
+
+  //   C1 PC/IB.
+  @Test
+  public void testB_C1_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 3, 3 );
+    assertThat( text, is( "012" ) );
+  }
+
+  //   C1 PC/OB.
+  @Test
+  public void testS_C1_PC_OB() throws IOException {
+
+    try {
+      String data = "0123456789";
+      BasicHttpEntity basic;
+      CappedBufferHttpEntity replay;
+      InputStream stream;
+      String text;
+
+      basic = new BasicHttpEntity();
+      basic.setContent(new ByteArrayInputStream(data.getBytes(UTF8)));
+      replay = new CappedBufferHttpEntity(basic, 5);
+      stream = replay.getContent();
+      text = byteRead(stream, -1);
+      fail("Expected IOException");
+      assertThat(text, is("0123456789"));
+      stream.close();
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  //   C1 PC/OB.
+  @Test
+  public void testB_C1_PC_OB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    try {
+      text = blockRead( stream, UTF8, -1, 4 );
+      fail( "Expected IOException" );
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  //   C1 PC/IB; C2 FC.
+  @Test
+  public void testS_C1_PC_IB__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 4 );
+    assertThat( text, is( "0123" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/IB; C2 FC.
+  @Test
+  public void testB_C1_PC_IB__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 4, 1 );
+    assertThat( text, is( "0123" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 7 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/OB; C2 AC; EE
+  @Test
+  public void testS_C1_PC_OB__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    try {
+      basic = new BasicHttpEntity();
+      basic.setContent(new ByteArrayInputStream(data.getBytes(UTF8)));
+      replay = new CappedBufferHttpEntity(basic, 5);
+
+      stream = replay.getContent();
+      text = byteRead(stream, 7);
+      assertThat(text, is("0123456"));
+      stream.close();
+      fail("Expected IOException");
+    } catch (IOException e) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/OB; C2 AC; EE
+  @Test
+  public void testB_C1_PC_OB__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    try {
+      text = blockRead( stream, UTF8, 7, 2 );
+      fail("Expected IOExceptin");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  //   C1 PC/IB; C1 XC; C2 FC.
+  @Test
+  public void testS_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = byteRead( stream, 7 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = byteRead( stream, -1 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/IB; C1 XC; C2 FC.
+  @Test
+  public void testB_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, 7, 2 );
+    assertThat( text, is( "0123456" ) );
+    stream.close();
+
+    stream = replay.getContent();
+    text = blockRead( stream, UTF8, -1, 7 );
+    assertThat( text, is( "0123456789" ) );
+  }
+
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testS_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    try {
+      stream = replay.getContent();
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/OB; C1 XC; C2 AC; EE
+  @Test
+  public void testB_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream = replay.getContent();
+    try {    
+      text = blockRead( stream, UTF8, 7, 2 );
+      fail( "Expected IOException" );
+    } catch ( IOException e ) {
+      // Expected.
+    }
+  }
+
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  @Test
+  public void testS_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+
+    stream1 = replay.getContent();
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = byteRead( stream2, 4 );
+    assertThat( text, is( "0123" ) );
+
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "345" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+  @Test
+  public void testB_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 20 );
+    stream1 = replay.getContent();
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = blockRead( stream2, UTF8, 4, 3 );
+    assertThat( text, is( "0123" ) );
+
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "345" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+  @Test
+  public void testS_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream1 = replay.getContent();
+    text = byteRead( stream1, 3 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    text = byteRead( stream2, 5 );
+    assertThat( text, is( "01234" ) );
+  }
+
+  //   C1 PC/IB; C2 PC/OB; C1 AC; EE
+  @Test
+  public void testB_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+    String data = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+    InputStream stream1, stream2;
+    String text;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    stream1 = replay.getContent();
+    text = blockRead( stream1, UTF8, 3, 2 );
+    assertThat( text, is( "012" ) );
+
+    stream2 = replay.getContent();
+    try {
+      text = blockRead( stream2, UTF8, 6, 4 );
+      fail("expected IOException");
+    } catch (IOException e) {
+      // expected
+    }
+ 
+  }
+
+  @Test
+  public void testWriteTo() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+    try {
+      replay.writeTo( buffer );
+      fail("expected IOException");
+    } catch (IOException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testIsRepeatable() throws Exception {
+    String text = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic );
+    assertThat( replay.isRepeatable(), is( true ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+    BufferedHttpEntity buffered = new BufferedHttpEntity( basic );
+    replay = new CappedBufferHttpEntity( buffered );
+    assertThat( replay.isRepeatable(), is( true ) );
+  }
+
+  @Test
+  public void testIsChunked() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.isChunked(), is( false ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setChunked( true );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.isChunked(), is( true ) );
+  }
+
+  @Test
+  public void testGetContentLength() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.getContentLength(), is( -1L ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentLength( input.length() );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.getContentLength(), is( 10L ) );
+  }
+
+  @Test
+  public void testGetContentType() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.getContentType(), nullValue() );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentType( ContentType.APPLICATION_JSON.getMimeType() );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.getContentType().getValue(), is( "application/json" ) );
+  }
+
+  @Test
+  public void testGetContentEncoding() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.getContentEncoding(), nullValue() );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    basic.setContentEncoding( "UTF-8" );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.getContentEncoding().getValue(), is( "UTF-8" ) );
+  }
+
+  @Test
+  public void testIsStreaming() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    InputStreamEntity streaming;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.isStreaming(), is( true ) );
+
+    basic = new BasicHttpEntity();
+    basic.setContent( null );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+    assertThat( replay.isStreaming(), is( false ) );
+
+    streaming = new InputStreamEntity( new ByteArrayInputStream( input.getBytes( UTF8 ) ), 10, ContentType.TEXT_PLAIN );
+    replay = new CappedBufferHttpEntity( streaming, 5 );
+    assertThat( replay.isStreaming(), is( true ) );
+  }
+
+  @Test
+  public void testConsumeContent() throws Exception {
+    String input = "0123456789";
+    BasicHttpEntity basic;
+    CappedBufferHttpEntity replay;
+
+    basic = new BasicHttpEntity();
+    basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+    replay = new CappedBufferHttpEntity( basic, 5 );
+
+    try {
+      replay.consumeContent();
+      fail( "Expected UnsupportedOperationException" );
+    } catch ( UnsupportedOperationException e ) {
+      // Expected.
+    }
+  }
+
+  private static String byteRead( InputStream stream, int total ) throws IOException {
+    StringBuilder string = null;
+    int c = 0;
+    if( total < 0 ) {
+      total = Integer.MAX_VALUE;
+    }
+    while( total > 0 && c >= 0 ) {
+      c = stream.read();
+      if( c >= 0 ) {
+        total--;
+        if( string == null ) {
+          string = new StringBuilder();
+        }
+        string.append( (char)c );
+      }
+    }
+    return string == null ? null : string.toString();
+  }
+
+  private static String blockRead( InputStream stream, Charset charset, int total, int chunk ) throws IOException {
+    StringBuilder string = null;
+    byte buffer[] = new byte[ chunk ];
+    int count = 0;
+    if( total < 0 ) {
+      total = Integer.MAX_VALUE;
+    }
+    while( total > 0 && count >= 0 ) {
+      count = stream.read( buffer, 0, Math.min( buffer.length, total ) );
+      if( count >= 0 ) {
+        total -= count;
+        if( string == null ) {
+          string = new StringBuilder();
+        }
+        string.append( new String( buffer, 0, count, charset ) );
+      }
+    }
+    return string == null ? null : string.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
index d429deb..660a627 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
@@ -99,13 +99,14 @@ public class HttpClientDispatchTest {
     HttpEntity httpEntity = httpClientDispatch.createRequestEntity(inboundRequest);
     System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
     assertFalse("buffering in the presence of delegation token", 
-        (httpEntity instanceof PartiallyRepeatableHttpEntity));
+        (httpEntity instanceof CappedBufferHttpEntity));
   }
   
   @Test
   public void testCallToSecureClusterWithoutDelegationTpken() throws URISyntaxException, IOException {
     System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
     HttpClientDispatch httpClientDispatch = new HttpClientDispatch();
+    httpClientDispatch.setReplayBufferSize(10);
     ServletInputStream inputStream = EasyMock.createNiceMock( ServletInputStream.class );
     HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
     EasyMock.expect(inboundRequest.getQueryString()).andReturn( "a=123").anyTimes();
@@ -114,7 +115,7 @@ public class HttpClientDispatchTest {
     HttpEntity httpEntity = httpClientDispatch.createRequestEntity(inboundRequest);
     System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
     assertTrue("not buffering in the absence of delegation token", 
-        (httpEntity instanceof PartiallyRepeatableHttpEntity));
+        (httpEntity instanceof CappedBufferHttpEntity));
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
index 7215945..fe78645 100644
--- a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
+++ b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
@@ -17,14 +17,6 @@
  */
 package org.apache.hadoop.gateway.oozie;
 
-import org.apache.hadoop.gateway.deploy.DeploymentContext;
-import org.apache.hadoop.gateway.deploy.ServiceDeploymentContributorBase;
-import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
-import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
-import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptor;
-import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptorFactory;
-import org.apache.hadoop.gateway.topology.Service;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -33,10 +25,23 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.gateway.deploy.DeploymentContext;
+import org.apache.hadoop.gateway.deploy.ServiceDeploymentContributorBase;
+import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
+import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
+import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptor;
+import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptorFactory;
+import org.apache.hadoop.gateway.topology.Service;
+
 public class OozieDeploymentContributor extends ServiceDeploymentContributorBase {
 
   private static final String RULES_RESOURCE = OozieDeploymentContributor.class.getName().replace( '.', '/' ) + "/rewrite.xml";
   private static final String EXTERNAL_PATH = "/oozie";
+  
+  private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+  
+  // Oozie replay buffer size in KB
+  private static final String OOZIW_REPLAY_BUFFER_SIZE = "8";
 
   @Override
   public String getRole() {
@@ -96,8 +101,14 @@ public class OozieDeploymentContributor extends ServiceDeploymentContributorBase
     context.contributeFilter( service, resource, "rewrite", null, params );
   }
 
-  private void addDispatchFilter(DeploymentContext context, Service service, ResourceDescriptor resource ) {
-    context.contributeFilter( service, resource, "dispatch", null, null );
+  private void addDispatchFilter(DeploymentContext context, Service service,
+      ResourceDescriptor resource) {
+    List<FilterParamDescriptor> filterParams = new ArrayList<FilterParamDescriptor>();
+    FilterParamDescriptor filterParamDescriptor = resource.createFilterParam();
+    filterParamDescriptor.name(REPLAY_BUFFER_SIZE_PARAM);
+    filterParamDescriptor.value(OOZIW_REPLAY_BUFFER_SIZE);
+    filterParams.add(filterParamDescriptor);
+    context.contributeFilter(service, resource, "dispatch", null, filterParams);
   }
 
   UrlRewriteRulesDescriptor loadRulesFromTemplate() throws IOException {