You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by di...@apache.org on 2013/10/02 19:49:06 UTC
git commit: KNOX-174,
upport service specific cap for buffering request entities for replay
against WWW-authenticate challenge
Updated Branches:
refs/heads/v0.3.0 abbe234d1 -> 61d3d15a7
KNOX-174, upport service specific cap for buffering request entities for replay against WWW-authenticate challenge
Project: http://git-wip-us.apache.org/repos/asf/incubator-knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-knox/commit/61d3d15a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-knox/tree/61d3d15a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-knox/diff/61d3d15a
Branch: refs/heads/v0.3.0
Commit: 61d3d15a70255a8beb19270f39fd9d25f81320e9
Parents: abbe234
Author: Dilli Dorai Arumugam <da...@hortonworks.com>
Authored: Wed Oct 2 10:47:05 2013 -0700
Committer: Dilli Dorai Arumugam <da...@hortonworks.com>
Committed: Wed Oct 2 10:47:05 2013 -0700
----------------------------------------------------------------------
.../impl/DispatchDeploymentContributor.java | 16 +
.../dispatch/CappedBufferHttpEntity.java | 152 ++++
.../gateway/dispatch/HttpClientDispatch.java | 49 +-
.../dispatch/CappedBufferHttpEntityTest.java | 854 +++++++++++++++++++
.../dispatch/HttpClientDispatchTest.java | 5 +-
.../oozie/OozieDeploymentContributor.java | 31 +-
6 files changed, 1083 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
index 91951da..c9189fb 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
@@ -26,9 +26,15 @@ import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
import org.apache.hadoop.gateway.topology.Provider;
import org.apache.hadoop.gateway.topology.Service;
+import java.util.ArrayList;
import java.util.List;
public class DispatchDeploymentContributor extends ProviderDeploymentContributorBase {
+
+ private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+
+ // Default global replay buffer size in KB
+ public static final String DEFAULT_REPLAY_BUFFER_SIZE = "4";
@Override
public String getRole() {
@@ -42,7 +48,17 @@ public class DispatchDeploymentContributor extends ProviderDeploymentContributor
@Override
public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
+ String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
+ if (params != null) {
+ for (FilterParamDescriptor paramDescriptor : params) {
+ if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
+ replayBufferSize = paramDescriptor.value();
+ break;
+ }
+ }
+ }
FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HttpClientDispatch.class );
+ filter.param().name("replayBufferSize").value(replayBufferSize);
if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
filter.param().name("kerberos").value("true");
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java
new file mode 100644
index 0000000..d5f7563
--- /dev/null
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntity.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.annotation.NotThreadSafe;
+import org.apache.http.entity.HttpEntityWrapper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+@NotThreadSafe
+public class CappedBufferHttpEntity extends HttpEntityWrapper {
+
+ public static final int DEFAULT_BUFFER_SIZE = 4096;
+
+ private int replayWriteIndex;
+ private int replayWriteLimit;
+ private byte[] replayBuffer;
+ private InputStream wrappedStream;
+
+ public CappedBufferHttpEntity( final HttpEntity entity, int bufferSize ) throws IOException {
+ super( entity );
+ this.wrappedStream = null;
+ this.replayWriteIndex = -1;
+ if( !entity.isRepeatable() ) {
+ this.replayBuffer = new byte[ bufferSize ];
+ this.replayWriteLimit = bufferSize-1;
+ } else {
+ this.replayBuffer = null;
+ }
+ }
+
+ public CappedBufferHttpEntity( final HttpEntity entity ) throws IOException {
+ this( entity, DEFAULT_BUFFER_SIZE );
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return true;
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return wrappedEntity.isStreaming();
+ }
+
+ @Override
+ public boolean isChunked() {
+ return wrappedEntity.isChunked();
+ }
+
+ @Override
+ public long getContentLength() {
+ return wrappedEntity.getContentLength();
+ }
+
+ // This will throw an IOException if an attempt is made to getContent a second time after
+ // more bytes than the buffer can hold has been read on the first stream.
+ @Override
+ public InputStream getContent() throws IOException {
+ // If the wrapped stream is repeatable return it directly.
+ if( replayBuffer == null ) {
+ return wrappedEntity.getContent();
+ // Else if the buffer has overflowed
+ } else {
+ if( wrappedStream == null ) {
+ wrappedStream = wrappedEntity.getContent();
+ }
+ return new ReplayStream();
+ }
+ }
+
+ @Override
+ public void writeTo( final OutputStream stream ) throws IOException {
+ IOUtils.copy( getContent(), stream );
+ }
+
+ @Override
+ public void consumeContent() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ private class ReplayStream extends InputStream {
+
+ private int replayReadIndex = -1;
+
+ @Override
+ public int read() throws IOException {
+ int b;
+ // If we can read from the buffer do so.
+ if( replayReadIndex < replayWriteIndex ) {
+ b = replayBuffer[ ++replayReadIndex ];
+ } else {
+ b = wrappedStream.read();
+ // If the underlying stream is not closed.
+ if( b > -1 ) {
+ if( replayWriteIndex < replayWriteLimit ) {
+ replayBuffer[ ++replayWriteIndex ] = (byte)b;
+ replayReadIndex++;
+ } else {
+ throw new IOException("Hit replay buffer max limit");
+ }
+ }
+ }
+ return b;
+ }
+
+ public int read( byte buffer[], int offset, int limit ) throws IOException {
+ int count = -1;
+ // If we can read from the buffer do so.
+ if( replayReadIndex < replayWriteIndex ) {
+ count = replayWriteIndex - replayReadIndex;
+ count = Math.min( limit, count );
+ System.arraycopy( replayBuffer, replayReadIndex+1, buffer, offset, count );
+ replayReadIndex += count;
+ } else {
+ count = wrappedStream.read( buffer, offset, limit );
+ // If the underlying stream is not closed.
+ if( count > -1 ) {
+ if( replayWriteIndex+count < replayWriteLimit ) {
+ System.arraycopy( buffer, offset, replayBuffer, replayWriteIndex+1, count );
+ replayReadIndex += count;
+ replayWriteIndex += count;
+ } else {
+ throw new IOException("Hit replay buffer max limit");
+ }
+ }
+ }
+ return count;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
index 616630c..546c1d0 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
@@ -17,6 +17,16 @@
*/
package org.apache.hadoop.gateway.dispatch;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.hadoop.gateway.GatewayMessages;
import org.apache.hadoop.gateway.GatewayResources;
import org.apache.hadoop.gateway.config.GatewayConfig;
@@ -33,24 +43,18 @@ import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicHeader;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-
/**
*
*/
public class HttpClientDispatch extends AbstractGatewayDispatch {
+ private static final String REPLAY_BUFFER_SIZE = "replayBufferSize";
+
// private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
// private static final String CT_APP_XML = "application/xml";
private static final String Q_DELEGATION_EQ = "?delegation=";
@@ -62,9 +66,22 @@ public class HttpClientDispatch extends AbstractGatewayDispatch {
private static GatewayMessages LOG = MessagesFactory.get( GatewayMessages.class );
private static GatewayResources RES = ResourcesFactory.get( GatewayResources.class );
- private static final int REPLAY_BUFFER_MAX_SIZE = 1024 * 1024; // limit to 1MB
+ private static final int DEFAULT_REPLAY_BUFFER_SIZE = 4 * 1024; // 4K
- private AppCookieManager appCookieManager = new AppCookieManager();;
+ private AppCookieManager appCookieManager = new AppCookieManager();
+
+ private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+
+ private int replayBufferSize = 0;
+
+ @Override
+ public void init( FilterConfig filterConfig ) throws ServletException {
+ super.init(filterConfig);
+ String replayBufferSizeString = filterConfig.getInitParameter( REPLAY_BUFFER_SIZE_PARAM );
+ if ( replayBufferSizeString != null ) {
+ setReplayBufferSize(Integer.valueOf(replayBufferSizeString));
+ }
+ }
protected void executeRequest(
HttpUriRequest outboundRequest,
@@ -193,8 +210,8 @@ public class HttpClientDispatch extends AbstractGatewayDispatch {
delegationTokenPresent = queryString.startsWith("delegation=") ||
queryString.contains("&delegation=");
}
- if (!delegationTokenPresent) {
- entity = new PartiallyRepeatableHttpEntity( entity );
+ if (!delegationTokenPresent && getReplayBufferSize() > 0 ) {
+ entity = new CappedBufferHttpEntity( entity, getReplayBufferSize() * 1024 );
}
}
@@ -246,4 +263,12 @@ public class HttpClientDispatch extends AbstractGatewayDispatch {
executeRequest( method, request, response );
}
+ int getReplayBufferSize() {
+ return replayBufferSize;
+ }
+
+ void setReplayBufferSize(int size) {
+ replayBufferSize = size;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java
new file mode 100644
index 0000000..e558ea8
--- /dev/null
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/CappedBufferHttpEntityTest.java
@@ -0,0 +1,854 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.http.entity.BasicHttpEntity;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+
+public class CappedBufferHttpEntityTest {
+
+ private static Charset UTF8 = Charset.forName( "UTF-8" );
+
+ // Variables
+ // Consumers: C1, C2
+ // Reads: FC - Full Content, PC - Partial Content, AC - Any Content
+ // Reads: IB - In Buffer, OB - Overflow Buffer
+ // Close: XC
+ // Expect: EE
+
+ // Test Cases
+ // C1 FC
+ // C1 FC/IB.
+ // C1 FC/OB.
+ // C1 FC/IB; C2 FC.
+ // C1 FC/OB; C2 AC; EE
+ // C1 FC/IB; C1 XC; C2 FC.
+ // C1 FC/OB; C1 XC; C2 AC; EE
+ // C1 PC
+ // C1 PC/IB.
+ // C1 PC/OB.
+ // C1 PC/IB; C2 FC.
+ // C1 PC/OB; C2 AC; EE
+ // C1 PC/IB; C1 XC; C2 FC.
+ // C1 PC/OB; C1 XC; C2 AC; EE
+ // C1 C2 C1
+ // C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+ // C1 PC/IB; C2 PC/OB; C1 AC; EE
+
+ @Test
+ public void testS__C1_FC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ String output;
+
+ output = byteRead( replay.getContent(), -1 );
+ assertThat( output, is( data ) );
+ }
+
+ @Test
+ public void testB__C1_FC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ String output;
+
+ output = blockRead( replay.getContent(), UTF8, -1, 3 );
+ assertThat( output, is( data ) );
+ }
+
+ @Test
+ public void testS__C1_FC_OB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ String output;
+
+ try {
+ output = byteRead( replay.getContent(), -1 );
+ fail("expected IOException");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testB__C1_FC_OB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ String output;
+
+ try {
+ output = blockRead( replay.getContent(), UTF8, -1, 3 );
+ fail("expected IOException");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testS_C1_FC_IB__C2_FC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ String output;
+
+ output = byteRead( replay.getContent(), -1 );
+ assertThat( output, is( data ) );
+
+ output = byteRead( replay.getContent(), -1 );
+ assertThat( output, is( data ) );
+ }
+
+ @Test
+ public void testB_C1_FC_IB__C2_FC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( "UTF-8" ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ String output;
+
+ output = blockRead( replay.getContent(), UTF8, -1, 3 );
+ assertThat( output, is( data ) );
+
+ output = blockRead( replay.getContent(), UTF8, -1, 3 );
+ assertThat( output, is( data ) );
+ }
+
+ @Test
+ public void testS_C1_FC_OB__C2_AC__EE() throws Exception {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ String output;
+
+ try {
+ output = byteRead( replay.getContent(), -1 );
+ fail( "Expected IOException" );
+ } catch( IOException e ) {
+ // Expected.
+ }
+
+ }
+
+ @Test
+ public void testB_C1_FC_OB__C2_AC__EE() throws Exception {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ String output;
+ try {
+ output = blockRead( replay.getContent(), UTF8, -1, 3 );
+ fail( "Expected IOException" );
+ } catch( IOException e ) {
+ // Expected.
+ }
+ }
+
+ // C1 FC/IB; C1 XC; C2 FC.
+ @Test
+ public void testS_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+ stream = replay.getContent();
+ text = byteRead( stream, -1 );
+ assertThat( text, is( "0123456789" ) );
+ stream.close();
+
+ stream = replay.getContent();
+ text = byteRead( stream, -1 );
+ assertThat( text, is( "0123456789" ) );
+ }
+
+ // C1 FC/IB; C1 XC; C2 FC.
+ @Test
+ public void testB_C1_FC_IB__C1_XC__C2_FC() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, -1, 3 );
+ assertThat( text, is( "0123456789" ) );
+ stream.close();
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, -1, 3 );
+ assertThat( text, is( "0123456789" ) );
+ }
+
+ // C1 FC/OB; C1 XC; C2 AC; EE
+ @Test
+ public void testS_C1_FC_OB__C1_XC__C2_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream = replay.getContent();
+ try {
+ text = byteRead( stream, -1 );
+ fail( "Expected IOException" );
+ } catch( IOException e ) {
+ // Expected.
+ }
+ }
+
+ // C1 FC/OB; C1 XC; C2 AC; EE
+ @Test
+ public void testB_C1_FC_OB__C1_XC__C2_AC_EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream = replay.getContent();
+ try {
+ text = blockRead( stream, UTF8, -1, 3 );
+ fail( "Expected IOException" );
+ } catch( IOException e ) {
+ // Expected.
+ }
+ }
+
+ // C1 PC/IB.
+ @Test
+ public void testS_C1_PC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = byteRead( stream, 3 );
+ assertThat( text, is( "012" ) );
+ }
+
+ // C1 PC/IB.
+ @Test
+ public void testB_C1_PC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, 3, 3 );
+ assertThat( text, is( "012" ) );
+ }
+
+ // C1 PC/OB.
+ @Test
+ public void testS_C1_PC_OB() throws IOException {
+
+ try {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent(new ByteArrayInputStream(data.getBytes(UTF8)));
+ replay = new CappedBufferHttpEntity(basic, 5);
+ stream = replay.getContent();
+ text = byteRead(stream, -1);
+ fail("Expected IOException");
+ assertThat(text, is("0123456789"));
+ stream.close();
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ // C1 PC/OB.
+ @Test
+ public void testB_C1_PC_OB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream = replay.getContent();
+ try {
+ text = blockRead( stream, UTF8, -1, 4 );
+ fail( "Expected IOException" );
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ // C1 PC/IB; C2 FC.
+ @Test
+ public void testS_C1_PC_IB__C2_FC() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = byteRead( stream, 4 );
+ assertThat( text, is( "0123" ) );
+ stream.close();
+
+ stream = replay.getContent();
+ text = byteRead( stream, -1 );
+ assertThat( text, is( "0123456789" ) );
+ }
+
+ // C1 PC/IB; C2 FC.
+ @Test
+ public void testB_C1_PC_IB__C2_FC() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, 4, 1 );
+ assertThat( text, is( "0123" ) );
+ stream.close();
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, -1, 7 );
+ assertThat( text, is( "0123456789" ) );
+ }
+
+ // C1 PC/OB; C2 AC; EE
+ @Test
+ public void testS_C1_PC_OB__C2_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ try {
+ basic = new BasicHttpEntity();
+ basic.setContent(new ByteArrayInputStream(data.getBytes(UTF8)));
+ replay = new CappedBufferHttpEntity(basic, 5);
+
+ stream = replay.getContent();
+ text = byteRead(stream, 7);
+ assertThat(text, is("0123456"));
+ stream.close();
+ fail("Expected IOException");
+ } catch (IOException e) {
+ // Expected.
+ }
+ }
+
+ // C1 PC/OB; C2 AC; EE
+ @Test
+ public void testB_C1_PC_OB__C2_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream = replay.getContent();
+ try {
+ text = blockRead( stream, UTF8, 7, 2 );
+ fail("Expected IOExceptin");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ // C1 PC/IB; C1 XC; C2 FC.
+ @Test
+ public void testS_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = byteRead( stream, 7 );
+ assertThat( text, is( "0123456" ) );
+ stream.close();
+
+ stream = replay.getContent();
+ text = byteRead( stream, -1 );
+ assertThat( text, is( "0123456789" ) );
+ }
+
+ // C1 PC/IB; C1 XC; C2 FC.
+ @Test
+ public void testB_C1_PC_IB__C1_XC__C2_FC() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, 7, 2 );
+ assertThat( text, is( "0123456" ) );
+ stream.close();
+
+ stream = replay.getContent();
+ text = blockRead( stream, UTF8, -1, 7 );
+ assertThat( text, is( "0123456789" ) );
+ }
+
+ // C1 PC/OB; C1 XC; C2 AC; EE
+ @Test
+ public void testS_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ try {
+ stream = replay.getContent();
+ } catch ( IOException e ) {
+ // Expected.
+ }
+ }
+
+ // C1 PC/OB; C1 XC; C2 AC; EE
+ @Test
+ public void testB_C1_PC_OB__C1_XC__C2_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream = replay.getContent();
+ try {
+ text = blockRead( stream, UTF8, 7, 2 );
+ fail( "Expected IOException" );
+ } catch ( IOException e ) {
+ // Expected.
+ }
+ }
+
+ // C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+ @Test
+ public void testS_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream1, stream2;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+
+ stream1 = replay.getContent();
+ text = byteRead( stream1, 3 );
+ assertThat( text, is( "012" ) );
+
+ stream2 = replay.getContent();
+ text = byteRead( stream2, 4 );
+ assertThat( text, is( "0123" ) );
+
+ text = byteRead( stream1, 3 );
+ assertThat( text, is( "345" ) );
+ }
+
+ // C1 PC/IB; C2 PC/IB; C1 PC/IB; C2 PC/IB - Back and forth before buffer overflow is OK.
+ @Test
+ public void testB_C1_PC_IB__C2_PC_IB__C2_PC_IB() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream1, stream2;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 20 );
+ stream1 = replay.getContent();
+ text = blockRead( stream1, UTF8, 3, 2 );
+ assertThat( text, is( "012" ) );
+
+ stream2 = replay.getContent();
+ text = blockRead( stream2, UTF8, 4, 3 );
+ assertThat( text, is( "0123" ) );
+
+ text = blockRead( stream1, UTF8, 3, 2 );
+ assertThat( text, is( "345" ) );
+ }
+
+ // C1 PC/IB; C2 PC/OB; C1 AC; EE
+ @Test
+ public void testS_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream1, stream2;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream1 = replay.getContent();
+ text = byteRead( stream1, 3 );
+ assertThat( text, is( "012" ) );
+
+ stream2 = replay.getContent();
+ text = byteRead( stream2, 5 );
+ assertThat( text, is( "01234" ) );
+ }
+
+ // C1 PC/IB; C2 PC/OB; C1 AC; EE
+ @Test
+ public void testB_C1_PC_IB__C2_PC_OB__C1_AC__EE() throws IOException {
+ String data = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+ InputStream stream1, stream2;
+ String text;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( data.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ stream1 = replay.getContent();
+ text = blockRead( stream1, UTF8, 3, 2 );
+ assertThat( text, is( "012" ) );
+
+ stream2 = replay.getContent();
+ try {
+ text = blockRead( stream2, UTF8, 6, 4 );
+ fail("expected IOException");
+ } catch (IOException e) {
+ // expected
+ }
+
+ }
+
+ @Test
+ public void testWriteTo() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ try {
+ replay.writeTo( buffer );
+ fail("expected IOException");
+ } catch (IOException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testIsRepeatable() throws Exception {
+ String text = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic );
+ assertThat( replay.isRepeatable(), is( true ) );
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( text.getBytes( UTF8 ) ) );
+ BufferedHttpEntity buffered = new BufferedHttpEntity( basic );
+ replay = new CappedBufferHttpEntity( buffered );
+ assertThat( replay.isRepeatable(), is( true ) );
+ }
+
+ @Test
+ public void testIsChunked() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.isChunked(), is( false ) );
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ basic.setChunked( true );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.isChunked(), is( true ) );
+ }
+
+ @Test
+ public void testGetContentLength() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.getContentLength(), is( -1L ) );
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ basic.setContentLength( input.length() );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.getContentLength(), is( 10L ) );
+ }
+
+ @Test
+ public void testGetContentType() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.getContentType(), nullValue() );
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ basic.setContentType( ContentType.APPLICATION_JSON.getMimeType() );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.getContentType().getValue(), is( "application/json" ) );
+ }
+
+ @Test
+ public void testGetContentEncoding() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.getContentEncoding(), nullValue() );
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ basic.setContentEncoding( "UTF-8" );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.getContentEncoding().getValue(), is( "UTF-8" ) );
+ }
+
+ @Test
+ public void testIsStreaming() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ InputStreamEntity streaming;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.isStreaming(), is( true ) );
+
+ basic = new BasicHttpEntity();
+ basic.setContent( null );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+ assertThat( replay.isStreaming(), is( false ) );
+
+ streaming = new InputStreamEntity( new ByteArrayInputStream( input.getBytes( UTF8 ) ), 10, ContentType.TEXT_PLAIN );
+ replay = new CappedBufferHttpEntity( streaming, 5 );
+ assertThat( replay.isStreaming(), is( true ) );
+ }
+
+ @Test
+ public void testConsumeContent() throws Exception {
+ String input = "0123456789";
+ BasicHttpEntity basic;
+ CappedBufferHttpEntity replay;
+
+ basic = new BasicHttpEntity();
+ basic.setContent( new ByteArrayInputStream( input.getBytes( UTF8 ) ) );
+ replay = new CappedBufferHttpEntity( basic, 5 );
+
+ try {
+ replay.consumeContent();
+ fail( "Expected UnsupportedOperationException" );
+ } catch ( UnsupportedOperationException e ) {
+ // Expected.
+ }
+ }
+
+ private static String byteRead( InputStream stream, int total ) throws IOException {
+ StringBuilder string = null;
+ int c = 0;
+ if( total < 0 ) {
+ total = Integer.MAX_VALUE;
+ }
+ while( total > 0 && c >= 0 ) {
+ c = stream.read();
+ if( c >= 0 ) {
+ total--;
+ if( string == null ) {
+ string = new StringBuilder();
+ }
+ string.append( (char)c );
+ }
+ }
+ return string == null ? null : string.toString();
+ }
+
+ private static String blockRead( InputStream stream, Charset charset, int total, int chunk ) throws IOException {
+ StringBuilder string = null;
+ byte buffer[] = new byte[ chunk ];
+ int count = 0;
+ if( total < 0 ) {
+ total = Integer.MAX_VALUE;
+ }
+ while( total > 0 && count >= 0 ) {
+ count = stream.read( buffer, 0, Math.min( buffer.length, total ) );
+ if( count >= 0 ) {
+ total -= count;
+ if( string == null ) {
+ string = new StringBuilder();
+ }
+ string.append( new String( buffer, 0, count, charset ) );
+ }
+ }
+ return string == null ? null : string.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
index d429deb..660a627 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
@@ -99,13 +99,14 @@ public class HttpClientDispatchTest {
HttpEntity httpEntity = httpClientDispatch.createRequestEntity(inboundRequest);
System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
assertFalse("buffering in the presence of delegation token",
- (httpEntity instanceof PartiallyRepeatableHttpEntity));
+ (httpEntity instanceof CappedBufferHttpEntity));
}
@Test
public void testCallToSecureClusterWithoutDelegationTpken() throws URISyntaxException, IOException {
System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
HttpClientDispatch httpClientDispatch = new HttpClientDispatch();
+ httpClientDispatch.setReplayBufferSize(10);
ServletInputStream inputStream = EasyMock.createNiceMock( ServletInputStream.class );
HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
EasyMock.expect(inboundRequest.getQueryString()).andReturn( "a=123").anyTimes();
@@ -114,7 +115,7 @@ public class HttpClientDispatchTest {
HttpEntity httpEntity = httpClientDispatch.createRequestEntity(inboundRequest);
System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
assertTrue("not buffering in the absence of delegation token",
- (httpEntity instanceof PartiallyRepeatableHttpEntity));
+ (httpEntity instanceof CappedBufferHttpEntity));
}
http://git-wip-us.apache.org/repos/asf/incubator-knox/blob/61d3d15a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
index 7215945..fe78645 100644
--- a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
+++ b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
@@ -17,14 +17,6 @@
*/
package org.apache.hadoop.gateway.oozie;
-import org.apache.hadoop.gateway.deploy.DeploymentContext;
-import org.apache.hadoop.gateway.deploy.ServiceDeploymentContributorBase;
-import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
-import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
-import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptor;
-import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptorFactory;
-import org.apache.hadoop.gateway.topology.Service;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -33,10 +25,23 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.gateway.deploy.DeploymentContext;
+import org.apache.hadoop.gateway.deploy.ServiceDeploymentContributorBase;
+import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
+import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
+import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptor;
+import org.apache.hadoop.gateway.filter.rewrite.api.UrlRewriteRulesDescriptorFactory;
+import org.apache.hadoop.gateway.topology.Service;
+
public class OozieDeploymentContributor extends ServiceDeploymentContributorBase {
private static final String RULES_RESOURCE = OozieDeploymentContributor.class.getName().replace( '.', '/' ) + "/rewrite.xml";
private static final String EXTERNAL_PATH = "/oozie";
+
+ private static final String REPLAY_BUFFER_SIZE_PARAM = "replayBufferSize";
+
+ // Oozie replay buffer size in KB
+ private static final String OOZIW_REPLAY_BUFFER_SIZE = "8";
@Override
public String getRole() {
@@ -96,8 +101,14 @@ public class OozieDeploymentContributor extends ServiceDeploymentContributorBase
context.contributeFilter( service, resource, "rewrite", null, params );
}
- private void addDispatchFilter(DeploymentContext context, Service service, ResourceDescriptor resource ) {
- context.contributeFilter( service, resource, "dispatch", null, null );
+ private void addDispatchFilter(DeploymentContext context, Service service,
+ ResourceDescriptor resource) {
+ List<FilterParamDescriptor> filterParams = new ArrayList<FilterParamDescriptor>();
+ FilterParamDescriptor filterParamDescriptor = resource.createFilterParam();
+ filterParamDescriptor.name(REPLAY_BUFFER_SIZE_PARAM);
+ filterParamDescriptor.value(OOZIW_REPLAY_BUFFER_SIZE);
+ filterParams.add(filterParamDescriptor);
+ context.contributeFilter(service, resource, "dispatch", null, filterParams);
}
UrlRewriteRulesDescriptor loadRulesFromTemplate() throws IOException {