You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by su...@apache.org on 2015/04/08 22:27:05 UTC
[1/2] knox git commit: KNOX-526 Added new dispatch classes for
backward compatibility
Repository: knox
Updated Branches:
refs/heads/master 2376b9565 -> 016a47dc6
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
index 7f4d27c..46fb43e 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatch.java
@@ -17,302 +17,19 @@
*/
package org.apache.hadoop.gateway.dispatch;
-import org.apache.hadoop.gateway.SpiGatewayMessages;
-import org.apache.hadoop.gateway.SpiGatewayResources;
-import org.apache.hadoop.gateway.audit.api.Action;
-import org.apache.hadoop.gateway.audit.api.ActionOutcome;
-import org.apache.hadoop.gateway.audit.api.AuditServiceFactory;
-import org.apache.hadoop.gateway.audit.api.Auditor;
-import org.apache.hadoop.gateway.audit.api.ResourceType;
-import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants;
-import org.apache.hadoop.gateway.config.Configure;
-import org.apache.hadoop.gateway.config.Default;
-import org.apache.hadoop.gateway.config.GatewayConfig;
-import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
-import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory;
-import org.apache.http.Header;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpDelete;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpOptions;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.InputStreamEntity;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.message.BasicHeader;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- *
+/***
+ * KNOX-526. Need to keep this class around for backward compatibility of deployed
+ * topologies. This is required for releases older than Apache Knox 0.6.0
*/
-public class HttpClientDispatch extends AbstractGatewayDispatch {
-
- // private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
- // private static final String CT_APP_XML = "application/xml";
- protected static final String Q_DELEGATION_EQ = "?delegation=";
- protected static final String AMP_DELEGATION_EQ = "&delegation=";
- protected static final String COOKIE = "Cookie";
- protected static final String SET_COOKIE = "Set-Cookie";
- protected static final String WWW_AUTHENTICATE = "WWW-Authenticate";
- protected static final String NEGOTIATE = "Negotiate";
-
- protected static SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class);
- protected static SpiGatewayResources RES = ResourcesFactory.get(SpiGatewayResources.class);
- protected static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME,
- AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME);
-
- protected AppCookieManager appCookieManager;
-
- private int replayBufferSize = 0;
- private Set<String> outboundResponseExcludeHeaders;
-
- @Override
- public void init() {
- setAppCookieManager(new AppCookieManager());
- outboundResponseExcludeHeaders = new HashSet<String>();
- outboundResponseExcludeHeaders.add(SET_COOKIE);
- outboundResponseExcludeHeaders.add(WWW_AUTHENTICATE);
- }
+@Deprecated
+public class HttpClientDispatch extends GatewayDispatchFilter {
@Override
- public void destroy() {
-
- }
-
- public void setAppCookieManager(AppCookieManager appCookieManager) {
- this.appCookieManager = appCookieManager;
- }
-
- protected void executeRequest(
- HttpUriRequest outboundRequest,
- HttpServletRequest inboundRequest,
- HttpServletResponse outboundResponse)
- throws IOException {
- HttpResponse inboundResponse = executeOutboundRequest(outboundRequest);
- writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
- }
-
- protected HttpResponse executeOutboundRequest(HttpUriRequest outboundRequest) throws IOException {
- LOG.dispatchRequest(outboundRequest.getMethod(), outboundRequest.getURI());
- HttpResponse inboundResponse = null;
-
- try {
- String query = outboundRequest.getURI().getQuery();
- if (!"true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
- // Hadoop cluster not Kerberos enabled
- addCredentialsToRequest(outboundRequest);
- inboundResponse = client.execute(outboundRequest);
- } else if (query.contains(Q_DELEGATION_EQ) ||
- // query string carries delegation token
- query.contains(AMP_DELEGATION_EQ)) {
- inboundResponse = client.execute(outboundRequest);
- } else {
- // Kerberos secured, no delegation token in query string
- inboundResponse = executeKerberosDispatch(outboundRequest, client);
- }
- } catch (IOException e) {
- // we do not want to expose back end host. port end points to clients, see JIRA KNOX-58
- LOG.dispatchServiceConnectionException(outboundRequest.getURI(), e);
- auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.FAILURE);
- throw new IOException(RES.dispatchConnectionError());
- } finally {
- if (inboundResponse != null) {
- int statusCode = inboundResponse.getStatusLine().getStatusCode();
- if (statusCode != 201) {
- LOG.dispatchResponseStatusCode(statusCode);
- } else {
- Header location = inboundResponse.getFirstHeader("Location");
- if (location == null) {
- LOG.dispatchResponseStatusCode(statusCode);
- } else {
- LOG.dispatchResponseCreatedStatusCode(statusCode, location.getValue());
- }
- }
- auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus(statusCode));
- } else {
- auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE);
- }
-
- }
- return inboundResponse;
- }
-
- protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
- // Copy the client respond header to the server respond.
- outboundResponse.setStatus(inboundResponse.getStatusLine().getStatusCode());
- Header[] headers = inboundResponse.getAllHeaders();
- Set<String> excludeHeaders = getOutboundResponseExcludeHeaders();
- boolean hasExcludeHeaders = false;
- if ((excludeHeaders != null) && !(excludeHeaders.isEmpty())) {
- hasExcludeHeaders = true;
- }
- for ( Header header : headers ) {
- String name = header.getName();
- if (hasExcludeHeaders && excludeHeaders.contains(name)) {
- continue;
- }
- String value = header.getValue();
- outboundResponse.addHeader(name, value);
- }
-
- HttpEntity entity = inboundResponse.getEntity();
- if ( entity != null ) {
- Header contentType = entity.getContentType();
- if ( contentType != null ) {
- outboundResponse.setContentType(contentType.getValue());
- }
- //KM[ If this is set here it ends up setting the content length to the content returned from the server.
- // This length might not match if the the content is rewritten.
- // long contentLength = entity.getContentLength();
- // if( contentLength <= Integer.MAX_VALUE ) {
- // outboundResponse.setContentLength( (int)contentLength );
- // }
- //]
- writeResponse(inboundRequest, outboundResponse, entity.getContent());
- }
- }
-
- /**
- * This method provides a hook for specialized credential propagation
- * in subclasses.
- *
- * @param outboundRequest
- */
- protected void addCredentialsToRequest(HttpUriRequest outboundRequest) {
- }
-
- protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest,
- HttpClient client) throws IOException {
- HttpResponse inboundResponse;
- outboundRequest.removeHeaders(COOKIE);
- String appCookie = appCookieManager.getCachedAppCookie();
- if (appCookie != null) {
- outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
- }
- inboundResponse = client.execute(outboundRequest);
- // if inBoundResponse has status 401 and header WWW-Authenticate: Negoitate
- // refresh hadoop.auth.cookie and attempt one more time
- int statusCode = inboundResponse.getStatusLine().getStatusCode();
- if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
- Header[] wwwAuthHeaders = inboundResponse.getHeaders(WWW_AUTHENTICATE);
- if (wwwAuthHeaders != null && wwwAuthHeaders.length != 0 &&
- wwwAuthHeaders[0].getValue().trim().startsWith(NEGOTIATE)) {
- appCookie = appCookieManager.getAppCookie(outboundRequest, true);
- outboundRequest.removeHeaders(COOKIE);
- outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
- client = new DefaultHttpClient();
- inboundResponse = client.execute(outboundRequest);
- } else {
- // no supported authentication type found
- // we would let the original response propagate
- }
- } else {
- // not a 401 Unauthorized status code
- // we would let the original response propagate
- }
- return inboundResponse;
- }
-
- protected HttpEntity createRequestEntity(HttpServletRequest request)
- throws IOException {
-
- String contentType = request.getContentType();
- int contentLength = request.getContentLength();
- InputStream contentStream = request.getInputStream();
-
- HttpEntity entity;
- if (contentType == null) {
- entity = new InputStreamEntity(contentStream, contentLength);
- } else {
- entity = new InputStreamEntity(contentStream, contentLength, ContentType.parse(contentType));
- }
-
-
- if ("true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
-
- //Check if delegation token is supplied in the request
- boolean delegationTokenPresent = false;
- String queryString = request.getQueryString();
- if (queryString != null) {
- delegationTokenPresent = queryString.startsWith("delegation=") ||
- queryString.contains("&delegation=");
- }
- if (!delegationTokenPresent && getReplayBufferSize() > 0) {
- entity = new CappedBufferHttpEntity(entity, getReplayBufferSize() * 1024);
- }
- }
-
- return entity;
- }
-
- @Override
- public void doGet(URI url, HttpServletRequest request, HttpServletResponse response)
- throws IOException, URISyntaxException {
- HttpGet method = new HttpGet(url);
- // https://issues.apache.org/jira/browse/KNOX-107 - Service URLs not rewritten for WebHDFS GET redirects
- method.getParams().setBooleanParameter("http.protocol.handle-redirects", false);
- copyRequestHeaderFields(method, request);
- executeRequest(method, request, response);
- }
-
- @Override
- public void doOptions(URI url, HttpServletRequest request, HttpServletResponse response)
- throws IOException, URISyntaxException {
- HttpOptions method = new HttpOptions(url);
- executeRequest(method, request, response);
- }
-
- @Override
- public void doPut(URI url, HttpServletRequest request, HttpServletResponse response)
- throws IOException, URISyntaxException {
- HttpPut method = new HttpPut(url);
- HttpEntity entity = createRequestEntity(request);
- method.setEntity(entity);
- copyRequestHeaderFields(method, request);
- executeRequest(method, request, response);
- }
-
- @Override
- public void doPost(URI url, HttpServletRequest request, HttpServletResponse response)
- throws IOException, URISyntaxException {
- HttpPost method = new HttpPost(url);
- HttpEntity entity = createRequestEntity(request);
- method.setEntity(entity);
- copyRequestHeaderFields(method, request);
- executeRequest(method, request, response);
- }
-
- @Override
- public void doDelete(URI url, HttpServletRequest request, HttpServletResponse response)
- throws IOException, URISyntaxException {
- HttpDelete method = new HttpDelete(url);
- copyRequestHeaderFields(method, request);
- executeRequest(method, request, response);
- }
-
- protected int getReplayBufferSize() {
- return replayBufferSize;
- }
-
- @Configure
- protected void setReplayBufferSize(@Default("8") int size) {
- replayBufferSize = size;
- }
-
- public Set<String> getOutboundResponseExcludeHeaders() {
- return outboundResponseExcludeHeaders;
+ public void init(FilterConfig filterConfig) throws ServletException {
+ setDispatch(new DefaultDispatch());
+ super.init(filterConfig);
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/DefaultDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/DefaultDispatchTest.java b/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/DefaultDispatchTest.java
new file mode 100644
index 0000000..592db57
--- /dev/null
+++ b/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/DefaultDispatchTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+public class DefaultDispatchTest {
+
+ // Make sure Hadoop cluster topology isn't exposed to client when there is a connectivity issue.
+ @Test
+ public void testJiraKnox58() throws URISyntaxException, IOException {
+
+ URI uri = new URI( "http://unreachable-host" );
+ BasicHttpParams params = new BasicHttpParams();
+
+ HttpUriRequest outboundRequest = EasyMock.createNiceMock( HttpUriRequest.class );
+ EasyMock.expect( outboundRequest.getMethod() ).andReturn( "GET" ).anyTimes();
+ EasyMock.expect( outboundRequest.getURI() ).andReturn( uri ).anyTimes();
+ EasyMock.expect( outboundRequest.getParams() ).andReturn( params ).anyTimes();
+
+ HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
+
+ HttpServletResponse outboundResponse = EasyMock.createNiceMock( HttpServletResponse.class );
+ EasyMock.expect( outboundResponse.getOutputStream() ).andAnswer( new IAnswer<ServletOutputStream>() {
+ @Override
+ public ServletOutputStream answer() throws Throwable {
+ return new ServletOutputStream() {
+ @Override
+ public void write( int b ) throws IOException {
+ throw new IOException( "unreachable-host" );
+ }
+ };
+ }
+ });
+
+ EasyMock.replay( outboundRequest, inboundRequest, outboundResponse );
+
+ DefaultDispatch dispatch = new DefaultDispatch();
+ dispatch.setHttpClient(new DefaultHttpClient());
+ try {
+ dispatch.executeRequest( outboundRequest, inboundRequest, outboundResponse );
+ fail( "Should have thrown IOException" );
+ } catch( IOException e ) {
+ assertThat( e.getMessage(), not( containsString( "unreachable-host" ) ) );
+ assertThat( e, not( instanceOf( UnknownHostException.class ) ) ) ;
+ assertThat( "Message needs meaningful content.", e.getMessage().trim().length(), greaterThan( 12 ) );
+ }
+ }
+
+ @Test
+ public void testCallToSecureClusterWithDelegationTpken() throws URISyntaxException, IOException {
+ System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
+ DefaultDispatch defaultDispatch = new DefaultDispatch();
+ ServletInputStream inputStream = EasyMock.createNiceMock( ServletInputStream.class );
+ HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
+ EasyMock.expect(inboundRequest.getQueryString()).andReturn( "delegation=123").anyTimes();
+ EasyMock.expect(inboundRequest.getInputStream()).andReturn( inputStream).anyTimes();
+ EasyMock.replay( inboundRequest );
+ HttpEntity httpEntity = defaultDispatch.createRequestEntity(inboundRequest);
+ System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
+ assertFalse("buffering in the presence of delegation token",
+ (httpEntity instanceof CappedBufferHttpEntity));
+ }
+
+ @Test
+ public void testCallToSecureClusterWithoutDelegationTpken() throws URISyntaxException, IOException {
+ System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
+ DefaultDispatch defaultDispatch = new DefaultDispatch();
+ defaultDispatch.setReplayBufferSize(10);
+ ServletInputStream inputStream = EasyMock.createNiceMock( ServletInputStream.class );
+ HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
+ EasyMock.expect(inboundRequest.getQueryString()).andReturn( "a=123").anyTimes();
+ EasyMock.expect(inboundRequest.getInputStream()).andReturn( inputStream).anyTimes();
+ EasyMock.replay( inboundRequest );
+ HttpEntity httpEntity = defaultDispatch.createRequestEntity(inboundRequest);
+ System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
+ assertTrue("not buffering in the absence of delegation token",
+ (httpEntity instanceof CappedBufferHttpEntity));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java b/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
deleted file mode 100644
index 9446ab5..0000000
--- a/gateway-spi/src/test/java/org/apache/hadoop/gateway/dispatch/HttpClientDispatchTest.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.gateway.dispatch;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.hadoop.gateway.config.GatewayConfig;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.BasicHttpParams;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Test;
-
-public class HttpClientDispatchTest {
-
- // Make sure Hadoop cluster topology isn't exposed to client when there is a connectivity issue.
- @Test
- public void testJiraKnox58() throws URISyntaxException, IOException {
-
- URI uri = new URI( "http://unreachable-host" );
- BasicHttpParams params = new BasicHttpParams();
-
- HttpUriRequest outboundRequest = EasyMock.createNiceMock( HttpUriRequest.class );
- EasyMock.expect( outboundRequest.getMethod() ).andReturn( "GET" ).anyTimes();
- EasyMock.expect( outboundRequest.getURI() ).andReturn( uri ).anyTimes();
- EasyMock.expect( outboundRequest.getParams() ).andReturn( params ).anyTimes();
-
- HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
-
- HttpServletResponse outboundResponse = EasyMock.createNiceMock( HttpServletResponse.class );
- EasyMock.expect( outboundResponse.getOutputStream() ).andAnswer( new IAnswer<ServletOutputStream>() {
- @Override
- public ServletOutputStream answer() throws Throwable {
- return new ServletOutputStream() {
- @Override
- public void write( int b ) throws IOException {
- throw new IOException( "unreachable-host" );
- }
- };
- }
- });
-
- EasyMock.replay( outboundRequest, inboundRequest, outboundResponse );
-
- HttpClientDispatch dispatch = new HttpClientDispatch();
- dispatch.setHttpClient(new DefaultHttpClient());
- try {
- dispatch.executeRequest( outboundRequest, inboundRequest, outboundResponse );
- fail( "Should have thrown IOException" );
- } catch( IOException e ) {
- assertThat( e.getMessage(), not( containsString( "unreachable-host" ) ) );
- assertThat( e, not( instanceOf( UnknownHostException.class ) ) ) ;
- assertThat( "Message needs meaningful content.", e.getMessage().trim().length(), greaterThan( 12 ) );
- }
- }
-
- @Test
- public void testCallToSecureClusterWithDelegationTpken() throws URISyntaxException, IOException {
- System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
- HttpClientDispatch httpClientDispatch = new HttpClientDispatch();
- ServletInputStream inputStream = EasyMock.createNiceMock( ServletInputStream.class );
- HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
- EasyMock.expect(inboundRequest.getQueryString()).andReturn( "delegation=123").anyTimes();
- EasyMock.expect(inboundRequest.getInputStream()).andReturn( inputStream).anyTimes();
- EasyMock.replay( inboundRequest );
- HttpEntity httpEntity = httpClientDispatch.createRequestEntity(inboundRequest);
- System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
- assertFalse("buffering in the presence of delegation token",
- (httpEntity instanceof CappedBufferHttpEntity));
- }
-
- @Test
- public void testCallToSecureClusterWithoutDelegationTpken() throws URISyntaxException, IOException {
- System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "true");
- HttpClientDispatch httpClientDispatch = new HttpClientDispatch();
- httpClientDispatch.setReplayBufferSize(10);
- ServletInputStream inputStream = EasyMock.createNiceMock( ServletInputStream.class );
- HttpServletRequest inboundRequest = EasyMock.createNiceMock( HttpServletRequest.class );
- EasyMock.expect(inboundRequest.getQueryString()).andReturn( "a=123").anyTimes();
- EasyMock.expect(inboundRequest.getInputStream()).andReturn( inputStream).anyTimes();
- EasyMock.replay( inboundRequest );
- HttpEntity httpEntity = httpClientDispatch.createRequestEntity(inboundRequest);
- System.setProperty(GatewayConfig.HADOOP_KERBEROS_SECURED, "false");
- assertTrue("not buffering in the absence of delegation token",
- (httpEntity instanceof CappedBufferHttpEntity));
- }
-
-
-}
[2/2] knox git commit: KNOX-526 Added new dispatch classes for
backward compatibility
Posted by su...@apache.org.
KNOX-526 Added new dispatch classes for backward compatibility
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/016a47dc
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/016a47dc
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/016a47dc
Branch: refs/heads/master
Commit: 016a47dc6b08192a4d762ad305313519dc2f31a5
Parents: 2376b95
Author: Sumit Gupta <su...@apache.org>
Authored: Wed Apr 8 16:08:37 2015 -0400
Committer: Sumit Gupta <su...@apache.org>
Committed: Wed Apr 8 16:08:37 2015 -0400
----------------------------------------------------------------------
.../impl/DispatchDeploymentContributor.java | 4 +-
.../apache/hadoop/gateway/AuditLoggingTest.java | 4 +-
.../resources/services/hbase/0.98.0/service.xml | 2 +-
.../resources/services/hive/0.13.0/service.xml | 2 +-
.../resources/services/storm/0.9.3/services.xml | 2 +-
.../services/webhdfs/2.4.0/service.xml | 2 +-
.../definition/ServiceDefinitionTest.java | 6 +-
.../hadoop/gateway/hbase/HBaseDispatch.java | 35 ++
.../gateway/hbase/HBaseHttpClientDispatch.java | 18 +-
.../hadoop/gateway/hive/HiveDispatch.java | 119 +++++++
.../gateway/hive/HiveHttpClientDispatch.java | 103 +-----
.../hadoop/gateway/storm/StormDispatch.java | 35 ++
.../gateway/storm/StormHttpClientDispatch.java | 35 --
.../gateway/hdfs/dispatch/HdfsDispatch.java | 34 +-
.../hdfs/dispatch/HdfsHttpClientDispatch.java | 45 +++
.../hdfs/dispatch/WebHdfsHaDispatch.java | 195 ++++++++++++
.../dispatch/WebHdfsHaHttpClientDispatch.java | 179 +----------
.../hdfs/dispatch/WebHdfsHaDispatchTest.java | 141 ++++++++
.../WebHdfsHaHttpClientDispatchTest.java | 141 --------
.../dispatch/AbstractGatewayDispatch.java | 4 +-
.../gateway/dispatch/DefaultDispatch.java | 318 +++++++++++++++++++
.../gateway/dispatch/GatewayDispatchFilter.java | 6 +-
.../gateway/dispatch/HttpClientDispatch.java | 303 +-----------------
.../gateway/dispatch/DefaultDispatchTest.java | 123 +++++++
.../dispatch/HttpClientDispatchTest.java | 123 -------
25 files changed, 1081 insertions(+), 898 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
index b7dccb0..3e1c336 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.gateway.descriptor.FilterDescriptor;
import org.apache.hadoop.gateway.descriptor.FilterParamDescriptor;
import org.apache.hadoop.gateway.descriptor.ResourceDescriptor;
import org.apache.hadoop.gateway.dispatch.GatewayDispatchFilter;
-import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
+import org.apache.hadoop.gateway.dispatch.DefaultDispatch;
import org.apache.hadoop.gateway.topology.Provider;
import org.apache.hadoop.gateway.topology.Service;
@@ -52,7 +52,7 @@ public class DispatchDeploymentContributor extends ProviderDeploymentContributor
@Override
public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( GatewayDispatchFilter.class );
- filter.param().name(DISPATCH_IMPL_PARAM).value(HttpClientDispatch.class.getName());
+ filter.param().name(DISPATCH_IMPL_PARAM).value(DefaultDispatch.class.getName());
FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-server/src/test/java/org/apache/hadoop/gateway/AuditLoggingTest.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/AuditLoggingTest.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/AuditLoggingTest.java
index 8d88ec0..7618253 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/AuditLoggingTest.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/AuditLoggingTest.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.gateway.audit.api.ResourceType;
import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants;
import org.apache.hadoop.gateway.audit.log4j.audit.Log4jAuditService;
import org.apache.hadoop.gateway.audit.log4j.correlation.Log4jCorrelationService;
-import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
+import org.apache.hadoop.gateway.dispatch.DefaultDispatch;
import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory;
import org.apache.hadoop.test.log.CollectAppender;
import org.apache.http.impl.client.DefaultHttpClient;
@@ -171,7 +171,7 @@ public class AuditLoggingTest {
HttpServletResponse outboundResponse = EasyMock.createNiceMock( HttpServletResponse.class );
EasyMock.replay( outboundResponse );
- HttpClientDispatch dispatch = new HttpClientDispatch();
+ DefaultDispatch dispatch = new DefaultDispatch();
dispatch.setHttpClient(new DefaultHttpClient());
try {
dispatch.doGet( new URI( uri ), inboundRequest, outboundResponse );
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-definitions/src/main/resources/services/hbase/0.98.0/service.xml
----------------------------------------------------------------------
diff --git a/gateway-service-definitions/src/main/resources/services/hbase/0.98.0/service.xml b/gateway-service-definitions/src/main/resources/services/hbase/0.98.0/service.xml
index e03bca7..37d49bb 100644
--- a/gateway-service-definitions/src/main/resources/services/hbase/0.98.0/service.xml
+++ b/gateway-service-definitions/src/main/resources/services/hbase/0.98.0/service.xml
@@ -29,5 +29,5 @@
<rewrite apply="WEBHBASE/webhbase/regions/outbound" to="response.body"/>
</route>
</routes>
- <dispatch classname="org.apache.hadoop.gateway.hbase.HBaseHttpClientDispatch"/>
+ <dispatch classname="org.apache.hadoop.gateway.hbase.HBaseDispatch"/>
</service>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-definitions/src/main/resources/services/hive/0.13.0/service.xml
----------------------------------------------------------------------
diff --git a/gateway-service-definitions/src/main/resources/services/hive/0.13.0/service.xml b/gateway-service-definitions/src/main/resources/services/hive/0.13.0/service.xml
index 32d1560..aba1d23 100644
--- a/gateway-service-definitions/src/main/resources/services/hive/0.13.0/service.xml
+++ b/gateway-service-definitions/src/main/resources/services/hive/0.13.0/service.xml
@@ -18,5 +18,5 @@
<routes>
<route path="/hive"/>
</routes>
- <dispatch classname="org.apache.hadoop.gateway.hive.HiveHttpClientDispatch"/>
+ <dispatch classname="org.apache.hadoop.gateway.hive.HiveDispatch"/>
</service>
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-definitions/src/main/resources/services/storm/0.9.3/services.xml
----------------------------------------------------------------------
diff --git a/gateway-service-definitions/src/main/resources/services/storm/0.9.3/services.xml b/gateway-service-definitions/src/main/resources/services/storm/0.9.3/services.xml
index c813265..ffe20a4 100644
--- a/gateway-service-definitions/src/main/resources/services/storm/0.9.3/services.xml
+++ b/gateway-service-definitions/src/main/resources/services/storm/0.9.3/services.xml
@@ -24,5 +24,5 @@
<rewrite apply="STORM/storm/logs/outbound" to="response.body"/>
</route>
</routes>
- <dispatch classname="org.apache.hadoop.gateway.storm.StormHttpClientDispatch"/>
+ <dispatch classname="org.apache.hadoop.gateway.storm.StormDispatch"/>
</service>
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-definitions/src/main/resources/services/webhdfs/2.4.0/service.xml
----------------------------------------------------------------------
diff --git a/gateway-service-definitions/src/main/resources/services/webhdfs/2.4.0/service.xml b/gateway-service-definitions/src/main/resources/services/webhdfs/2.4.0/service.xml
index 2470175..f958b42 100644
--- a/gateway-service-definitions/src/main/resources/services/webhdfs/2.4.0/service.xml
+++ b/gateway-service-definitions/src/main/resources/services/webhdfs/2.4.0/service.xml
@@ -36,5 +36,5 @@
<dispatch contributor-name="http-client" />
</route>
</routes>
- <dispatch classname="org.apache.hadoop.gateway.hdfs.dispatch.HdfsDispatch" ha-classname="org.apache.hadoop.gateway.hdfs.dispatch.WebHdfsHaHttpClientDispatch"/>
+ <dispatch classname="org.apache.hadoop.gateway.hdfs.dispatch.HdfsHttpClientDispatch" ha-classname="org.apache.hadoop.gateway.hdfs.dispatch.WebHdfsHaDispatch"/>
</service>
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-definitions/src/test/java/org/apache/hadoop/gateway/service/definition/ServiceDefinitionTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-definitions/src/test/java/org/apache/hadoop/gateway/service/definition/ServiceDefinitionTest.java b/gateway-service-definitions/src/test/java/org/apache/hadoop/gateway/service/definition/ServiceDefinitionTest.java
index 5d1a544..f4a92cb 100644
--- a/gateway-service-definitions/src/test/java/org/apache/hadoop/gateway/service/definition/ServiceDefinitionTest.java
+++ b/gateway-service-definitions/src/test/java/org/apache/hadoop/gateway/service/definition/ServiceDefinitionTest.java
@@ -75,12 +75,12 @@ public class ServiceDefinitionTest {
url = ClassLoader.getSystemResource("services/hbase/0.98.0/service.xml");
definition = (ServiceDefinition) unmarshaller.unmarshal(url.openStream());
assertNotNull(definition.getDispatch());
- assertEquals("org.apache.hadoop.gateway.hbase.HBaseHttpClientDispatch", definition.getDispatch().getClassName());
+ assertEquals("org.apache.hadoop.gateway.hbase.HBaseDispatch", definition.getDispatch().getClassName());
url = ClassLoader.getSystemResource("services/webhdfs/2.4.0/service.xml");
definition = (ServiceDefinition) unmarshaller.unmarshal(url.openStream());
assertNotNull(definition.getDispatch());
- assertEquals("org.apache.hadoop.gateway.hdfs.dispatch.HdfsDispatch", definition.getDispatch().getClassName());
- assertEquals("org.apache.hadoop.gateway.hdfs.dispatch.WebHdfsHaHttpClientDispatch", definition.getDispatch().getHaClassName());
+ assertEquals("org.apache.hadoop.gateway.hdfs.dispatch.HdfsHttpClientDispatch", definition.getDispatch().getClassName());
+ assertEquals("org.apache.hadoop.gateway.hdfs.dispatch.WebHdfsHaDispatch", definition.getDispatch().getHaClassName());
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatch.java b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatch.java
new file mode 100644
index 0000000..b019aed
--- /dev/null
+++ b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatch.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hbase;
+
+import org.apache.hadoop.gateway.dispatch.DefaultDispatch;
+
+/**
+ * This specialized dispatch provides HBase specific features to the
+ * default HttpClientDispatch.
+ */
+public class HBaseDispatch extends DefaultDispatch {
+
+ @Override
+ public void init() {
+ super.init();
+ setAppCookieManager(new HBaseCookieManager());
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseHttpClientDispatch.java b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseHttpClientDispatch.java
index 0d39950..5f5025d 100644
--- a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseHttpClientDispatch.java
+++ b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseHttpClientDispatch.java
@@ -17,22 +17,22 @@
*/
package org.apache.hadoop.gateway.hbase;
-import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
+import org.apache.hadoop.gateway.dispatch.GatewayDispatchFilter;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
-/**
- * This specialized dispatch provides HBase specific features to the
- * default HttpClientDispatch.
+/***
+ * KNOX-526. Need to keep this class around for backward compatibility of deployed
+ * topologies. This is required for releases older than Apache Knox 0.6.0
*/
-public class HBaseHttpClientDispatch extends HttpClientDispatch {
+@Deprecated
+public class HBaseHttpClientDispatch extends GatewayDispatchFilter {
@Override
- public void init() {
- super.init();
- setAppCookieManager(new HBaseCookieManager());
+ public void init(FilterConfig filterConfig) throws ServletException {
+ setDispatch(new HBaseDispatch());
+ super.init(filterConfig);
}
-
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatch.java b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatch.java
new file mode 100644
index 0000000..06f2d3e
--- /dev/null
+++ b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatch.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hive;
+
+import org.apache.hadoop.gateway.config.Configure;
+import org.apache.hadoop.gateway.dispatch.DefaultDispatch;
+import org.apache.hadoop.gateway.security.PrimaryPrincipal;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.impl.auth.BasicScheme;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import javax.security.auth.Subject;
+import java.io.IOException;
+import java.security.AccessController;
+import java.security.Principal;
+
+/**
+ * This specialized dispatch provides Hive specific features to the
+ * default HttpClientDispatch.
+ */
+public class HiveDispatch extends DefaultDispatch {
+ private static final String PASSWORD_PLACEHOLDER = "*";
+ private boolean basicAuthPreemptive = false;
+ private boolean kerberos = false;
+ private static final EmptyJaasCredentials EMPTY_JAAS_CREDENTIALS = new EmptyJaasCredentials();
+
+ @Override
+ public void init() {
+ super.init();
+ }
+
+ protected Principal getPrimaryPrincipal() {
+ Principal principal = null;
+ Subject subject = Subject.getSubject( AccessController.getContext());
+ if( subject != null ) {
+ principal = (Principal)subject.getPrincipals(PrimaryPrincipal.class).toArray()[0];
+ }
+ return principal;
+ }
+
+ protected void addCredentialsToRequest(HttpUriRequest request) {
+ if( isBasicAuthPreemptive() ) {
+ Principal principal = getPrimaryPrincipal();
+ if( principal != null ) {
+
+ UsernamePasswordCredentials credentials =
+ new UsernamePasswordCredentials( principal.getName(), PASSWORD_PLACEHOLDER );
+
+ request.addHeader(BasicScheme.authenticate(credentials,"US-ASCII",false));
+ }
+ }
+ }
+
+ @Configure
+ public void setBasicAuthPreemptive( boolean basicAuthPreemptive ) {
+ this.basicAuthPreemptive = basicAuthPreemptive;
+ }
+
+ public boolean isBasicAuthPreemptive() {
+ return basicAuthPreemptive;
+ }
+
+ public boolean isKerberos() {
+ return kerberos;
+ }
+
+ @Configure
+ public void setKerberos(boolean kerberos) {
+ this.kerberos = kerberos;
+ }
+
+ protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest,
+ DefaultHttpClient client) throws IOException {
+ //DefaultHttpClient client = new DefaultHttpClient();
+ SPNegoSchemeFactory spNegoSF = new SPNegoSchemeFactory(
+ /* stripPort */true);
+ // spNegoSF.setSpengoGenerator(new BouncySpnegoTokenGenerator());
+ client.getAuthSchemes().register(AuthPolicy.SPNEGO, spNegoSF);
+ client.getCredentialsProvider().setCredentials(
+ new AuthScope(/* host */null, /* port */-1, /* realm */null),
+ EMPTY_JAAS_CREDENTIALS);
+ return client.execute(outboundRequest);
+ }
+
+ private static class EmptyJaasCredentials implements Credentials {
+
+ public String getPassword() {
+ return null;
+ }
+
+ public Principal getUserPrincipal() {
+ return null;
+ }
+
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveHttpClientDispatch.java b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveHttpClientDispatch.java
index a18baea..d44b123 100644
--- a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveHttpClientDispatch.java
+++ b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveHttpClientDispatch.java
@@ -17,103 +17,22 @@
*/
package org.apache.hadoop.gateway.hive;
-import org.apache.hadoop.gateway.config.Configure;
-import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
-import org.apache.hadoop.gateway.security.PrimaryPrincipal;
-import org.apache.http.HttpResponse;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.Credentials;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.params.AuthPolicy;
-import org.apache.http.impl.auth.BasicScheme;
-import org.apache.http.impl.auth.SPNegoSchemeFactory;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.hadoop.gateway.dispatch.GatewayDispatchFilter;
-import javax.security.auth.Subject;
-import java.io.IOException;
-import java.security.AccessController;
-import java.security.Principal;
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletException;
-/**
- * This specialized dispatch provides Hive specific features to the
- * default HttpClientDispatch.
+/***
+ * KNOX-526. Need to keep this class around for backward compatibility of deployed
+ * topologies. This is required for releases older than Apache Knox 0.6.0
*/
-public class HiveHttpClientDispatch extends HttpClientDispatch {
- private static final String PASSWORD_PLACEHOLDER = "*";
- private boolean basicAuthPreemptive = false;
- private boolean kerberos = false;
- private static final EmptyJaasCredentials EMPTY_JAAS_CREDENTIALS = new EmptyJaasCredentials();
+@Deprecated
+public class HiveHttpClientDispatch extends GatewayDispatchFilter {
@Override
- public void init() {
- super.init();
- }
-
- protected Principal getPrimaryPrincipal() {
- Principal principal = null;
- Subject subject = Subject.getSubject( AccessController.getContext());
- if( subject != null ) {
- principal = (Principal)subject.getPrincipals(PrimaryPrincipal.class).toArray()[0];
- }
- return principal;
- }
-
- protected void addCredentialsToRequest(HttpUriRequest request) {
- if( isBasicAuthPreemptive() ) {
- Principal principal = getPrimaryPrincipal();
- if( principal != null ) {
-
- UsernamePasswordCredentials credentials =
- new UsernamePasswordCredentials( principal.getName(), PASSWORD_PLACEHOLDER );
-
- request.addHeader(BasicScheme.authenticate(credentials,"US-ASCII",false));
- }
- }
- }
-
- @Configure
- public void setBasicAuthPreemptive( boolean basicAuthPreemptive ) {
- this.basicAuthPreemptive = basicAuthPreemptive;
- }
-
- public boolean isBasicAuthPreemptive() {
- return basicAuthPreemptive;
- }
-
- public boolean isKerberos() {
- return kerberos;
- }
-
- @Configure
- public void setKerberos(boolean kerberos) {
- this.kerberos = kerberos;
- }
-
- protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest,
- DefaultHttpClient client) throws IOException {
- //DefaultHttpClient client = new DefaultHttpClient();
- SPNegoSchemeFactory spNegoSF = new SPNegoSchemeFactory(
- /* stripPort */true);
- // spNegoSF.setSpengoGenerator(new BouncySpnegoTokenGenerator());
- client.getAuthSchemes().register(AuthPolicy.SPNEGO, spNegoSF);
- client.getCredentialsProvider().setCredentials(
- new AuthScope(/* host */null, /* port */-1, /* realm */null),
- EMPTY_JAAS_CREDENTIALS);
- return client.execute(outboundRequest);
- }
-
- private static class EmptyJaasCredentials implements Credentials {
-
- public String getPassword() {
- return null;
- }
-
- public Principal getUserPrincipal() {
- return null;
- }
-
+ public void init(FilterConfig filterConfig) throws ServletException {
+ setDispatch(new HiveDispatch());
+ super.init(filterConfig);
}
-
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormDispatch.java b/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormDispatch.java
new file mode 100644
index 0000000..abca519
--- /dev/null
+++ b/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormDispatch.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.storm;
+
+import org.apache.hadoop.gateway.dispatch.DefaultDispatch;
+
+import java.util.Set;
+
+/**
+ * This specialized dispatch provides Storm specific features to the
+ * default dispatch.
+ */
+public class StormDispatch extends DefaultDispatch {
+
+ @Override
+ public Set<String> getOutboundResponseExcludeHeaders() {
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormHttpClientDispatch.java b/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormHttpClientDispatch.java
deleted file mode 100644
index 889c427..0000000
--- a/gateway-service-storm/src/main/java/org/apache/hadoop/gateway/storm/StormHttpClientDispatch.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.gateway.storm;
-
-import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
-
-import java.util.Set;
-
-/**
- * This specialized dispatch provides Storm specific features to the
- * default HttpClientDispatch.
- */
-public class StormHttpClientDispatch extends HttpClientDispatch {
-
- @Override
- public Set<String> getOutboundResponseExcludeHeaders() {
- return null;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsDispatch.java
index 2ccc41e..f4ec34e 100644
--- a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsDispatch.java
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsDispatch.java
@@ -17,29 +17,21 @@
*/
package org.apache.hadoop.gateway.hdfs.dispatch;
-import org.apache.hadoop.gateway.dispatch.HttpClientDispatch;
-import org.apache.http.HttpEntity;
+import org.apache.hadoop.gateway.dispatch.GatewayDispatchFilter;
+
+import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import java.io.IOException;
-public class HdfsDispatch extends HttpClientDispatch {
+/***
+ * KNOX-526. Need to keep this class around for backward compatibility of deployed
+ * topologies. This is required for releases older than Apache Knox 0.6.0
+ */
+@Deprecated
+public class HdfsDispatch extends GatewayDispatchFilter {
- public HdfsDispatch() throws ServletException {
- super();
+ @Override
+ public void init(FilterConfig filterConfig) throws ServletException {
+ setDispatch(new HdfsHttpClientDispatch());
+ super.init(filterConfig);
}
-
- //@Override
- /**
- * This method ensures that the request InputStream is not acquired
- * prior to a dispatch to a component such as a namenode that doesn't
- * the request body. The side effect of this is that the client does
- * not get a 100 continue from Knox which will trigger the client to
- * send the entire payload before redirect to the target component
- * like a datanode and have to send it again.
- */
- protected HttpEntity createRequestEntity(HttpServletRequest request)
- throws IOException {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsHttpClientDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsHttpClientDispatch.java
new file mode 100644
index 0000000..c03de7a
--- /dev/null
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/HdfsHttpClientDispatch.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs.dispatch;
+
+import org.apache.hadoop.gateway.dispatch.DefaultDispatch;
+import org.apache.http.HttpEntity;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+
+public class HdfsHttpClientDispatch extends DefaultDispatch {
+
+ public HdfsHttpClientDispatch() throws ServletException {
+ super();
+ }
+
+ //@Override
+ /**
+ * This method ensures that the request InputStream is not acquired
+ * prior to a dispatch to a component such as a namenode that doesn't
+ * the request body. The side effect of this is that the client does
+ * not get a 100 continue from Knox which will trigger the client to
+ * send the entire payload before redirect to the target component
+ * like a datanode and have to send it again.
+ */
+ protected HttpEntity createRequestEntity(HttpServletRequest request)
+ throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatch.java
new file mode 100644
index 0000000..d0bfd34
--- /dev/null
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatch.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs.dispatch;
+
+import org.apache.hadoop.gateway.config.Configure;
+import org.apache.hadoop.gateway.filter.AbstractGatewayFilter;
+import org.apache.hadoop.gateway.ha.provider.HaProvider;
+import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.impl.HaServiceConfigConstants;
+import org.apache.hadoop.gateway.hdfs.i18n.WebHdfsMessages;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.BufferedHttpEntity;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class WebHdfsHaDispatch extends HdfsHttpClientDispatch {
+
+ private static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter";
+
+ private static final String RETRY_COUNTER_ATTRIBUTE = "dispatch.ha.retry.counter";
+
+ public static final String RESOURCE_ROLE = "WEBHDFS";
+
+ private static final WebHdfsMessages LOG = MessagesFactory.get(WebHdfsMessages.class);
+
+ private int maxFailoverAttempts = HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
+
+ private int failoverSleep = HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
+
+ private int maxRetryAttempts = HaServiceConfigConstants.DEFAULT_MAX_RETRY_ATTEMPTS;
+
+ private int retrySleep = HaServiceConfigConstants.DEFAULT_RETRY_SLEEP;
+
+ private HaProvider haProvider;
+
+ /**
+ * @throws javax.servlet.ServletException
+ */
+ public WebHdfsHaDispatch() throws ServletException {
+ super();
+ }
+
+ @Override
+ public void init() {
+ super.init();
+ if (haProvider != null) {
+ HaServiceConfig serviceConfig = haProvider.getHaDescriptor().getServiceConfig(RESOURCE_ROLE);
+ maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
+ failoverSleep = serviceConfig.getFailoverSleep();
+ maxRetryAttempts = serviceConfig.getMaxRetryAttempts();
+ retrySleep = serviceConfig.getRetrySleep();
+ }
+ }
+
+ public HaProvider getHaProvider() {
+ return haProvider;
+ }
+
+ @Configure
+ public void setHaProvider(HaProvider haProvider) {
+ this.haProvider = haProvider;
+ }
+
+ @Override
+ protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException {
+ HttpResponse inboundResponse = null;
+ try {
+ inboundResponse = executeOutboundRequest(outboundRequest);
+ writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ } catch (StandbyException e) {
+ LOG.errorReceivedFromStandbyNode(e);
+ failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
+ } catch (SafeModeException e) {
+ LOG.errorReceivedFromSafeModeNode(e);
+ retryRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
+ } catch (IOException e) {
+ LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e);
+ failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
+ }
+ }
+
+ /**
+ * Checks for specific outbound response codes/content to trigger a retry or failover
+ */
+ @Override
+ protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
+ if (inboundResponse.getStatusLine().getStatusCode() == 403) {
+ BufferedHttpEntity entity = new BufferedHttpEntity(inboundResponse.getEntity());
+ inboundResponse.setEntity(entity);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ inboundResponse.getEntity().writeTo(outputStream);
+ String body = new String(outputStream.toByteArray());
+ if (body.contains("StandbyException")) {
+ throw new StandbyException();
+ }
+ if (body.contains("SafeModeException") || body.contains("RetriableException")) {
+ throw new SafeModeException();
+ }
+ }
+ super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ }
+
+ private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
+ LOG.failingOverRequest(outboundRequest.getURI().toString());
+ AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
+ if (counter == null) {
+ counter = new AtomicInteger(0);
+ }
+ inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
+ if (counter.incrementAndGet() <= maxFailoverAttempts) {
+ haProvider.markFailedURL(RESOURCE_ROLE, outboundRequest.getURI().toString());
+ //null out target url so that rewriters run again
+ inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null);
+ URI uri = getDispatchUrl(inboundRequest);
+ ((HttpRequestBase) outboundRequest).setURI(uri);
+ if (failoverSleep > 0) {
+ try {
+ Thread.sleep(failoverSleep);
+ } catch (InterruptedException e) {
+ LOG.failoverSleepFailed(RESOURCE_ROLE, e);
+ }
+ }
+ executeRequest(outboundRequest, inboundRequest, outboundResponse);
+ } else {
+ LOG.maxFailoverAttemptsReached(maxFailoverAttempts, RESOURCE_ROLE);
+ if (inboundResponse != null) {
+ writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ } else {
+ throw new IOException(exception);
+ }
+ }
+ }
+
+ private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
+ LOG.retryingRequest(outboundRequest.getURI().toString());
+ AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE);
+ if (counter == null) {
+ counter = new AtomicInteger(0);
+ }
+ inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter);
+ if (counter.incrementAndGet() <= maxRetryAttempts) {
+ if (retrySleep > 0) {
+ try {
+ Thread.sleep(retrySleep);
+ } catch (InterruptedException e) {
+ LOG.retrySleepFailed(RESOURCE_ROLE, e);
+ }
+ }
+ executeRequest(outboundRequest, inboundRequest, outboundResponse);
+ } else {
+ LOG.maxRetryAttemptsReached(maxRetryAttempts, RESOURCE_ROLE, outboundRequest.getURI().toString());
+ if (inboundResponse != null) {
+ writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ } else {
+ throw new IOException(exception);
+ }
+ }
+ }
+
+ private static URI getDispatchUrl(HttpServletRequest request) {
+ StringBuffer str = request.getRequestURL();
+ String query = request.getQueryString();
+ if ( query != null ) {
+ str.append('?');
+ str.append(query);
+ }
+ URI url = URI.create(str.toString());
+ return url;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
index f21b6e9..bb1e623 100644
--- a/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
+++ b/gateway-service-webhdfs/src/main/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatch.java
@@ -17,181 +17,22 @@
*/
package org.apache.hadoop.gateway.hdfs.dispatch;
-import org.apache.hadoop.gateway.config.Configure;
-import org.apache.hadoop.gateway.filter.AbstractGatewayFilter;
-import org.apache.hadoop.gateway.ha.provider.HaProvider;
-import org.apache.hadoop.gateway.ha.provider.HaServiceConfig;
-import org.apache.hadoop.gateway.ha.provider.HaServletContextListener;
-import org.apache.hadoop.gateway.ha.provider.impl.HaServiceConfigConstants;
-import org.apache.hadoop.gateway.hdfs.i18n.WebHdfsMessages;
-import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.hadoop.gateway.dispatch.GatewayDispatchFilter;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.atomic.AtomicInteger;
-public class WebHdfsHaHttpClientDispatch extends HdfsDispatch {
- private static final String FAILOVER_COUNTER_ATTRIBUTE = "dispatch.ha.failover.counter";
-
- private static final String RETRY_COUNTER_ATTRIBUTE = "dispatch.ha.retry.counter";
-
- public static final String RESOURCE_ROLE = "WEBHDFS";
-
- private static final WebHdfsMessages LOG = MessagesFactory.get(WebHdfsMessages.class);
-
- private int maxFailoverAttempts = HaServiceConfigConstants.DEFAULT_MAX_FAILOVER_ATTEMPTS;
-
- private int failoverSleep = HaServiceConfigConstants.DEFAULT_FAILOVER_SLEEP;
-
- private int maxRetryAttempts = HaServiceConfigConstants.DEFAULT_MAX_RETRY_ATTEMPTS;
-
- private int retrySleep = HaServiceConfigConstants.DEFAULT_RETRY_SLEEP;
-
- private HaProvider haProvider;
-
- /**
- * @throws ServletException
- */
- public WebHdfsHaHttpClientDispatch() throws ServletException {
- super();
- }
-
- @Override
- public void init() {
- super.init();
- if (haProvider != null) {
- HaServiceConfig serviceConfig = haProvider.getHaDescriptor().getServiceConfig(RESOURCE_ROLE);
- maxFailoverAttempts = serviceConfig.getMaxFailoverAttempts();
- failoverSleep = serviceConfig.getFailoverSleep();
- maxRetryAttempts = serviceConfig.getMaxRetryAttempts();
- retrySleep = serviceConfig.getRetrySleep();
- }
- }
-
- public HaProvider getHaProvider() {
- return haProvider;
- }
-
- @Configure
- public void setHaProvider(HaProvider haProvider) {
- this.haProvider = haProvider;
- }
+/***
+ * KNOX-526. Need to keep this class around for backward compatibility of deployed
+ * topologies. This is required for releases older than Apache Knox 0.6.0
+ */
+@Deprecated
+public class WebHdfsHaHttpClientDispatch extends GatewayDispatchFilter {
@Override
- protected void executeRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException {
- HttpResponse inboundResponse = null;
- try {
- inboundResponse = executeOutboundRequest(outboundRequest);
- writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
- } catch (StandbyException e) {
- LOG.errorReceivedFromStandbyNode(e);
- failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
- } catch (SafeModeException e) {
- LOG.errorReceivedFromSafeModeNode(e);
- retryRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
- } catch (IOException e) {
- LOG.errorConnectingToServer(outboundRequest.getURI().toString(), e);
- failoverRequest(outboundRequest, inboundRequest, outboundResponse, inboundResponse, e);
- }
- }
-
- /**
- * Checks for specific outbound response codes/content to trigger a retry or failover
- */
- @Override
- protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
- if (inboundResponse.getStatusLine().getStatusCode() == 403) {
- BufferedHttpEntity entity = new BufferedHttpEntity(inboundResponse.getEntity());
- inboundResponse.setEntity(entity);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- inboundResponse.getEntity().writeTo(outputStream);
- String body = new String(outputStream.toByteArray());
- if (body.contains("StandbyException")) {
- throw new StandbyException();
- }
- if (body.contains("SafeModeException") || body.contains("RetriableException")) {
- throw new SafeModeException();
- }
- }
- super.writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
- }
-
- private void failoverRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
- LOG.failingOverRequest(outboundRequest.getURI().toString());
- AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(FAILOVER_COUNTER_ATTRIBUTE);
- if (counter == null) {
- counter = new AtomicInteger(0);
- }
- inboundRequest.setAttribute(FAILOVER_COUNTER_ATTRIBUTE, counter);
- if (counter.incrementAndGet() <= maxFailoverAttempts) {
- haProvider.markFailedURL(RESOURCE_ROLE, outboundRequest.getURI().toString());
- //null out target url so that rewriters run again
- inboundRequest.setAttribute(AbstractGatewayFilter.TARGET_REQUEST_URL_ATTRIBUTE_NAME, null);
- URI uri = getDispatchUrl(inboundRequest);
- ((HttpRequestBase) outboundRequest).setURI(uri);
- if (failoverSleep > 0) {
- try {
- Thread.sleep(failoverSleep);
- } catch (InterruptedException e) {
- LOG.failoverSleepFailed(RESOURCE_ROLE, e);
- }
- }
- executeRequest(outboundRequest, inboundRequest, outboundResponse);
- } else {
- LOG.maxFailoverAttemptsReached(maxFailoverAttempts, RESOURCE_ROLE);
- if (inboundResponse != null) {
- writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
- } else {
- throw new IOException(exception);
- }
- }
- }
-
- private void retryRequest(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse, Exception exception) throws IOException {
- LOG.retryingRequest(outboundRequest.getURI().toString());
- AtomicInteger counter = (AtomicInteger) inboundRequest.getAttribute(RETRY_COUNTER_ATTRIBUTE);
- if (counter == null) {
- counter = new AtomicInteger(0);
- }
- inboundRequest.setAttribute(RETRY_COUNTER_ATTRIBUTE, counter);
- if (counter.incrementAndGet() <= maxRetryAttempts) {
- if (retrySleep > 0) {
- try {
- Thread.sleep(retrySleep);
- } catch (InterruptedException e) {
- LOG.retrySleepFailed(RESOURCE_ROLE, e);
- }
- }
- executeRequest(outboundRequest, inboundRequest, outboundResponse);
- } else {
- LOG.maxRetryAttemptsReached(maxRetryAttempts, RESOURCE_ROLE, outboundRequest.getURI().toString());
- if (inboundResponse != null) {
- writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
- } else {
- throw new IOException(exception);
- }
- }
- }
-
- private static URI getDispatchUrl(HttpServletRequest request) {
- StringBuffer str = request.getRequestURL();
- String query = request.getQueryString();
- if ( query != null ) {
- str.append('?');
- str.append(query);
- }
- URI url = URI.create(str.toString());
- return url;
+ public void init(FilterConfig filterConfig) throws ServletException {
+ setDispatch(new WebHdfsHaDispatch());
+ super.init(filterConfig);
}
-
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java
new file mode 100644
index 0000000..422218f
--- /dev/null
+++ b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaDispatchTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.hdfs.dispatch;
+
+import org.apache.hadoop.gateway.dispatch.AppCookieManager;
+import org.apache.hadoop.gateway.ha.provider.HaDescriptor;
+import org.apache.hadoop.gateway.ha.provider.HaProvider;
+import org.apache.hadoop.gateway.ha.provider.HaServletContextListener;
+import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaDescriptor;
+import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaProvider;
+import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaServiceConfig;
+import org.apache.hadoop.gateway.ha.provider.impl.HaDescriptorFactory;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.servlet.FilterConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class WebHdfsHaDispatchTest {
+
+ private class InstrumentedWebHdfsHaHttpClientDispatch extends WebHdfsHaDispatch {
+
+ public InstrumentedWebHdfsHaHttpClientDispatch() throws ServletException {
+ }
+
+ public AppCookieManager getAppCookieManager() {
+ return appCookieManager;
+ }
+
+ }
+
+ @Test
+ public void testInitCallsSuperInit() throws Exception {
+ DefaultHaDescriptor haDescriptor = new DefaultHaDescriptor();
+ haDescriptor.addServiceConfig( new DefaultHaServiceConfig( "test-role" ) );
+ HaProvider haProvider = new DefaultHaProvider( haDescriptor );
+ ServletContext context = EasyMock.createNiceMock(ServletContext.class);
+ EasyMock.expect(context.getAttribute(HaServletContextListener.PROVIDER_ATTRIBUTE_NAME)).andReturn(haProvider).anyTimes();
+ FilterConfig config = EasyMock.createNiceMock( FilterConfig.class );
+ EasyMock.expect(config.getServletContext()).andReturn(context).anyTimes();
+ EasyMock.expect(config.getInitParameter(EasyMock.anyObject(String.class))).andReturn(null).anyTimes();
+ InstrumentedWebHdfsHaHttpClientDispatch dispatch = new InstrumentedWebHdfsHaHttpClientDispatch();
+ EasyMock.replay(context,config);
+
+ dispatch.init();
+
+ assertThat( dispatch.getAppCookieManager(), notNullValue() );
+ }
+
+ @Test
+ public void testConnectivityFailover() throws Exception {
+ String serviceName = "WEBHDFS";
+ HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
+ descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", "2", "1000"));
+ HaProvider provider = new DefaultHaProvider(descriptor);
+ URI uri1 = new URI( "http://unreachable-host" );
+ URI uri2 = new URI( "http://reachable-host" );
+ ArrayList<String> urlList = new ArrayList<String>();
+ urlList.add(uri1.toString());
+ urlList.add(uri2.toString());
+ provider.addHaService(serviceName, urlList);
+ FilterConfig filterConfig = EasyMock.createNiceMock(FilterConfig.class);
+ ServletContext servletContext = EasyMock.createNiceMock(ServletContext.class);
+
+ EasyMock.expect(filterConfig.getServletContext()).andReturn(servletContext).anyTimes();
+ EasyMock.expect(servletContext.getAttribute(HaServletContextListener.PROVIDER_ATTRIBUTE_NAME)).andReturn(provider).anyTimes();
+
+ BasicHttpParams params = new BasicHttpParams();
+
+ HttpUriRequest outboundRequest = EasyMock.createNiceMock(HttpRequestBase.class);
+ EasyMock.expect(outboundRequest.getMethod()).andReturn( "GET" ).anyTimes();
+ EasyMock.expect(outboundRequest.getURI()).andReturn( uri1 ).anyTimes();
+ EasyMock.expect(outboundRequest.getParams()).andReturn( params ).anyTimes();
+
+ HttpServletRequest inboundRequest = EasyMock.createNiceMock(HttpServletRequest.class);
+ EasyMock.expect(inboundRequest.getRequestURL()).andReturn( new StringBuffer(uri2.toString()) ).once();
+ EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(0)).once();
+ EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(1)).once();
+
+ HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class);
+ EasyMock.expect(outboundResponse.getOutputStream()).andAnswer( new IAnswer<ServletOutputStream>() {
+ @Override
+ public ServletOutputStream answer() throws Throwable {
+ return new ServletOutputStream() {
+ @Override
+ public void write( int b ) throws IOException {
+ throw new IOException( "unreachable-host" );
+ }
+ };
+ }
+ }).once();
+ EasyMock.replay(filterConfig, servletContext, outboundRequest, inboundRequest, outboundResponse);
+ Assert.assertEquals(uri1.toString(), provider.getActiveURL(serviceName));
+ WebHdfsHaDispatch dispatch = new WebHdfsHaDispatch();
+ dispatch.setHttpClient(new DefaultHttpClient());
+ dispatch.setHaProvider(provider);
+ dispatch.init();
+ long startTime = System.currentTimeMillis();
+ try {
+ dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
+ } catch (IOException e) {
+ //this is expected after the failover limit is reached
+ }
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ Assert.assertEquals(uri2.toString(), provider.getActiveURL(serviceName));
+ //test to make sure the sleep took place
+ Assert.assertTrue(elapsedTime > 1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java b/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
deleted file mode 100644
index 757288f..0000000
--- a/gateway-service-webhdfs/src/test/java/org/apache/hadoop/gateway/hdfs/dispatch/WebHdfsHaHttpClientDispatchTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.gateway.hdfs.dispatch;
-
-import org.apache.hadoop.gateway.dispatch.AppCookieManager;
-import org.apache.hadoop.gateway.ha.provider.HaDescriptor;
-import org.apache.hadoop.gateway.ha.provider.HaProvider;
-import org.apache.hadoop.gateway.ha.provider.HaServletContextListener;
-import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaDescriptor;
-import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaProvider;
-import org.apache.hadoop.gateway.ha.provider.impl.DefaultHaServiceConfig;
-import org.apache.hadoop.gateway.ha.provider.impl.HaDescriptorFactory;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.params.BasicHttpParams;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.servlet.FilterConfig;
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-public class WebHdfsHaHttpClientDispatchTest {
-
- private class InstrumentedWebHdfsHaHttpClientDispatch extends WebHdfsHaHttpClientDispatch {
-
- public InstrumentedWebHdfsHaHttpClientDispatch() throws ServletException {
- }
-
- public AppCookieManager getAppCookieManager() {
- return appCookieManager;
- }
-
- }
-
- @Test
- public void testInitCallsSuperInit() throws Exception {
- DefaultHaDescriptor haDescriptor = new DefaultHaDescriptor();
- haDescriptor.addServiceConfig( new DefaultHaServiceConfig( "test-role" ) );
- HaProvider haProvider = new DefaultHaProvider( haDescriptor );
- ServletContext context = EasyMock.createNiceMock(ServletContext.class);
- EasyMock.expect(context.getAttribute(HaServletContextListener.PROVIDER_ATTRIBUTE_NAME)).andReturn(haProvider).anyTimes();
- FilterConfig config = EasyMock.createNiceMock( FilterConfig.class );
- EasyMock.expect(config.getServletContext()).andReturn(context).anyTimes();
- EasyMock.expect(config.getInitParameter(EasyMock.anyObject(String.class))).andReturn(null).anyTimes();
- InstrumentedWebHdfsHaHttpClientDispatch dispatch = new InstrumentedWebHdfsHaHttpClientDispatch();
- EasyMock.replay(context,config);
-
- dispatch.init();
-
- assertThat( dispatch.getAppCookieManager(), notNullValue() );
- }
-
- @Test
- public void testConnectivityFailover() throws Exception {
- String serviceName = "WEBHDFS";
- HaDescriptor descriptor = HaDescriptorFactory.createDescriptor();
- descriptor.addServiceConfig(HaDescriptorFactory.createServiceConfig(serviceName, "true", "1", "1000", "2", "1000"));
- HaProvider provider = new DefaultHaProvider(descriptor);
- URI uri1 = new URI( "http://unreachable-host" );
- URI uri2 = new URI( "http://reachable-host" );
- ArrayList<String> urlList = new ArrayList<String>();
- urlList.add(uri1.toString());
- urlList.add(uri2.toString());
- provider.addHaService(serviceName, urlList);
- FilterConfig filterConfig = EasyMock.createNiceMock(FilterConfig.class);
- ServletContext servletContext = EasyMock.createNiceMock(ServletContext.class);
-
- EasyMock.expect(filterConfig.getServletContext()).andReturn(servletContext).anyTimes();
- EasyMock.expect(servletContext.getAttribute(HaServletContextListener.PROVIDER_ATTRIBUTE_NAME)).andReturn(provider).anyTimes();
-
- BasicHttpParams params = new BasicHttpParams();
-
- HttpUriRequest outboundRequest = EasyMock.createNiceMock(HttpRequestBase.class);
- EasyMock.expect(outboundRequest.getMethod()).andReturn( "GET" ).anyTimes();
- EasyMock.expect(outboundRequest.getURI()).andReturn( uri1 ).anyTimes();
- EasyMock.expect(outboundRequest.getParams()).andReturn( params ).anyTimes();
-
- HttpServletRequest inboundRequest = EasyMock.createNiceMock(HttpServletRequest.class);
- EasyMock.expect(inboundRequest.getRequestURL()).andReturn( new StringBuffer(uri2.toString()) ).once();
- EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(0)).once();
- EasyMock.expect(inboundRequest.getAttribute("dispatch.ha.failover.counter")).andReturn(new AtomicInteger(1)).once();
-
- HttpServletResponse outboundResponse = EasyMock.createNiceMock(HttpServletResponse.class);
- EasyMock.expect(outboundResponse.getOutputStream()).andAnswer( new IAnswer<ServletOutputStream>() {
- @Override
- public ServletOutputStream answer() throws Throwable {
- return new ServletOutputStream() {
- @Override
- public void write( int b ) throws IOException {
- throw new IOException( "unreachable-host" );
- }
- };
- }
- }).once();
- EasyMock.replay(filterConfig, servletContext, outboundRequest, inboundRequest, outboundResponse);
- Assert.assertEquals(uri1.toString(), provider.getActiveURL(serviceName));
- WebHdfsHaHttpClientDispatch dispatch = new WebHdfsHaHttpClientDispatch();
- dispatch.setHttpClient(new DefaultHttpClient());
- dispatch.setHaProvider(provider);
- dispatch.init();
- long startTime = System.currentTimeMillis();
- try {
- dispatch.executeRequest(outboundRequest, inboundRequest, outboundResponse);
- } catch (IOException e) {
- //this is expected after the failover limit is reached
- }
- long elapsedTime = System.currentTimeMillis() - startTime;
- Assert.assertEquals(uri2.toString(), provider.getActiveURL(serviceName));
- //test to make sure the sleep took place
- Assert.assertTrue(elapsedTime > 1000);
- }
-}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/AbstractGatewayDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/AbstractGatewayDispatch.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/AbstractGatewayDispatch.java
index 0aeebfb..b5ff6d4 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/AbstractGatewayDispatch.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/AbstractGatewayDispatch.java
@@ -99,8 +99,8 @@ public abstract class AbstractGatewayDispatch implements Dispatch {
String name = headerNames.nextElement();
if ( !outboundRequest.containsHeader( name )
&& !EXCLUDE_HEADERS.contains( name ) ) {
- String vaule = inboundRequest.getHeader( name );
- outboundRequest.addHeader( name, vaule );
+ String value = inboundRequest.getHeader( name );
+ outboundRequest.addHeader( name, value );
}
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultDispatch.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultDispatch.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultDispatch.java
new file mode 100644
index 0000000..1ee9e49
--- /dev/null
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/DefaultDispatch.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.gateway.dispatch;
+
+import org.apache.hadoop.gateway.SpiGatewayMessages;
+import org.apache.hadoop.gateway.SpiGatewayResources;
+import org.apache.hadoop.gateway.audit.api.Action;
+import org.apache.hadoop.gateway.audit.api.ActionOutcome;
+import org.apache.hadoop.gateway.audit.api.AuditServiceFactory;
+import org.apache.hadoop.gateway.audit.api.Auditor;
+import org.apache.hadoop.gateway.audit.api.ResourceType;
+import org.apache.hadoop.gateway.audit.log4j.audit.AuditConstants;
+import org.apache.hadoop.gateway.config.Configure;
+import org.apache.hadoop.gateway.config.Default;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.i18n.resources.ResourcesFactory;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicHeader;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class DefaultDispatch extends AbstractGatewayDispatch {
+
+ // private static final String CT_APP_WWW_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
+ // private static final String CT_APP_XML = "application/xml";
+ protected static final String Q_DELEGATION_EQ = "?delegation=";
+ protected static final String AMP_DELEGATION_EQ = "&delegation=";
+ protected static final String COOKIE = "Cookie";
+ protected static final String SET_COOKIE = "Set-Cookie";
+ protected static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+ protected static final String NEGOTIATE = "Negotiate";
+
+ protected static SpiGatewayMessages LOG = MessagesFactory.get(SpiGatewayMessages.class);
+ protected static SpiGatewayResources RES = ResourcesFactory.get(SpiGatewayResources.class);
+ protected static Auditor auditor = AuditServiceFactory.getAuditService().getAuditor(AuditConstants.DEFAULT_AUDITOR_NAME,
+ AuditConstants.KNOX_SERVICE_NAME, AuditConstants.KNOX_COMPONENT_NAME);
+
+ protected AppCookieManager appCookieManager;
+
+ private int replayBufferSize = 0;
+ private Set<String> outboundResponseExcludeHeaders;
+
+ @Override
+ public void init() {
+ setAppCookieManager(new AppCookieManager());
+ outboundResponseExcludeHeaders = new HashSet<String>();
+ outboundResponseExcludeHeaders.add(SET_COOKIE);
+ outboundResponseExcludeHeaders.add(WWW_AUTHENTICATE);
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ public void setAppCookieManager(AppCookieManager appCookieManager) {
+ this.appCookieManager = appCookieManager;
+ }
+
+ protected void executeRequest(
+ HttpUriRequest outboundRequest,
+ HttpServletRequest inboundRequest,
+ HttpServletResponse outboundResponse)
+ throws IOException {
+ HttpResponse inboundResponse = executeOutboundRequest(outboundRequest);
+ writeOutboundResponse(outboundRequest, inboundRequest, outboundResponse, inboundResponse);
+ }
+
+ protected HttpResponse executeOutboundRequest(HttpUriRequest outboundRequest) throws IOException {
+ LOG.dispatchRequest(outboundRequest.getMethod(), outboundRequest.getURI());
+ HttpResponse inboundResponse = null;
+
+ try {
+ String query = outboundRequest.getURI().getQuery();
+ if (!"true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
+ // Hadoop cluster not Kerberos enabled
+ addCredentialsToRequest(outboundRequest);
+ inboundResponse = client.execute(outboundRequest);
+ } else if (query.contains(Q_DELEGATION_EQ) ||
+ // query string carries delegation token
+ query.contains(AMP_DELEGATION_EQ)) {
+ inboundResponse = client.execute(outboundRequest);
+ } else {
+ // Kerberos secured, no delegation token in query string
+ inboundResponse = executeKerberosDispatch(outboundRequest, client);
+ }
+ } catch (IOException e) {
+ // we do not want to expose back end host. port end points to clients, see JIRA KNOX-58
+ LOG.dispatchServiceConnectionException(outboundRequest.getURI(), e);
+ auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.FAILURE);
+ throw new IOException(RES.dispatchConnectionError());
+ } finally {
+ if (inboundResponse != null) {
+ int statusCode = inboundResponse.getStatusLine().getStatusCode();
+ if (statusCode != 201) {
+ LOG.dispatchResponseStatusCode(statusCode);
+ } else {
+ Header location = inboundResponse.getFirstHeader("Location");
+ if (location == null) {
+ LOG.dispatchResponseStatusCode(statusCode);
+ } else {
+ LOG.dispatchResponseCreatedStatusCode(statusCode, location.getValue());
+ }
+ }
+ auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus(statusCode));
+ } else {
+ auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE);
+ }
+
+ }
+ return inboundResponse;
+ }
+
+ protected void writeOutboundResponse(HttpUriRequest outboundRequest, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse, HttpResponse inboundResponse) throws IOException {
+ // Copy the client respond header to the server respond.
+ outboundResponse.setStatus(inboundResponse.getStatusLine().getStatusCode());
+ Header[] headers = inboundResponse.getAllHeaders();
+ Set<String> excludeHeaders = getOutboundResponseExcludeHeaders();
+ boolean hasExcludeHeaders = false;
+ if ((excludeHeaders != null) && !(excludeHeaders.isEmpty())) {
+ hasExcludeHeaders = true;
+ }
+ for ( Header header : headers ) {
+ String name = header.getName();
+ if (hasExcludeHeaders && excludeHeaders.contains(name)) {
+ continue;
+ }
+ String value = header.getValue();
+ outboundResponse.addHeader(name, value);
+ }
+
+ HttpEntity entity = inboundResponse.getEntity();
+ if ( entity != null ) {
+ Header contentType = entity.getContentType();
+ if ( contentType != null ) {
+ outboundResponse.setContentType(contentType.getValue());
+ }
+ //KM[ If this is set here it ends up setting the content length to the content returned from the server.
+ // This length might not match if the the content is rewritten.
+ // long contentLength = entity.getContentLength();
+ // if( contentLength <= Integer.MAX_VALUE ) {
+ // outboundResponse.setContentLength( (int)contentLength );
+ // }
+ //]
+ writeResponse(inboundRequest, outboundResponse, entity.getContent());
+ }
+ }
+
+ /**
+ * This method provides a hook for specialized credential propagation
+ * in subclasses.
+ *
+ * @param outboundRequest
+ */
+ protected void addCredentialsToRequest(HttpUriRequest outboundRequest) {
+ }
+
+ protected HttpResponse executeKerberosDispatch(HttpUriRequest outboundRequest,
+ HttpClient client) throws IOException {
+ HttpResponse inboundResponse;
+ outboundRequest.removeHeaders(COOKIE);
+ String appCookie = appCookieManager.getCachedAppCookie();
+ if (appCookie != null) {
+ outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
+ }
+ inboundResponse = client.execute(outboundRequest);
+ // if inBoundResponse has status 401 and header WWW-Authenticate: Negoitate
+ // refresh hadoop.auth.cookie and attempt one more time
+ int statusCode = inboundResponse.getStatusLine().getStatusCode();
+ if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
+ Header[] wwwAuthHeaders = inboundResponse.getHeaders(WWW_AUTHENTICATE);
+ if (wwwAuthHeaders != null && wwwAuthHeaders.length != 0 &&
+ wwwAuthHeaders[0].getValue().trim().startsWith(NEGOTIATE)) {
+ appCookie = appCookieManager.getAppCookie(outboundRequest, true);
+ outboundRequest.removeHeaders(COOKIE);
+ outboundRequest.addHeader(new BasicHeader(COOKIE, appCookie));
+ client = new DefaultHttpClient();
+ inboundResponse = client.execute(outboundRequest);
+ } else {
+ // no supported authentication type found
+ // we would let the original response propagate
+ }
+ } else {
+ // not a 401 Unauthorized status code
+ // we would let the original response propagate
+ }
+ return inboundResponse;
+ }
+
+ protected HttpEntity createRequestEntity(HttpServletRequest request)
+ throws IOException {
+
+ String contentType = request.getContentType();
+ int contentLength = request.getContentLength();
+ InputStream contentStream = request.getInputStream();
+
+ HttpEntity entity;
+ if (contentType == null) {
+ entity = new InputStreamEntity(contentStream, contentLength);
+ } else {
+ entity = new InputStreamEntity(contentStream, contentLength, ContentType.parse(contentType));
+ }
+
+
+ if ("true".equals(System.getProperty(GatewayConfig.HADOOP_KERBEROS_SECURED))) {
+
+ //Check if delegation token is supplied in the request
+ boolean delegationTokenPresent = false;
+ String queryString = request.getQueryString();
+ if (queryString != null) {
+ delegationTokenPresent = queryString.startsWith("delegation=") ||
+ queryString.contains("&delegation=");
+ }
+ if (!delegationTokenPresent && getReplayBufferSize() > 0) {
+ entity = new CappedBufferHttpEntity(entity, getReplayBufferSize() * 1024);
+ }
+ }
+
+ return entity;
+ }
+
+ @Override
+ public void doGet(URI url, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, URISyntaxException {
+ HttpGet method = new HttpGet(url);
+ // https://issues.apache.org/jira/browse/KNOX-107 - Service URLs not rewritten for WebHDFS GET redirects
+ method.getParams().setBooleanParameter("http.protocol.handle-redirects", false);
+ copyRequestHeaderFields(method, request);
+ executeRequest(method, request, response);
+ }
+
+ @Override
+ public void doOptions(URI url, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, URISyntaxException {
+ HttpOptions method = new HttpOptions(url);
+ executeRequest(method, request, response);
+ }
+
+ @Override
+ public void doPut(URI url, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, URISyntaxException {
+ HttpPut method = new HttpPut(url);
+ HttpEntity entity = createRequestEntity(request);
+ method.setEntity(entity);
+ copyRequestHeaderFields(method, request);
+ executeRequest(method, request, response);
+ }
+
+ @Override
+ public void doPost(URI url, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, URISyntaxException {
+ HttpPost method = new HttpPost(url);
+ HttpEntity entity = createRequestEntity(request);
+ method.setEntity(entity);
+ copyRequestHeaderFields(method, request);
+ executeRequest(method, request, response);
+ }
+
+ @Override
+ public void doDelete(URI url, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, URISyntaxException {
+ HttpDelete method = new HttpDelete(url);
+ copyRequestHeaderFields(method, request);
+ executeRequest(method, request, response);
+ }
+
+ protected int getReplayBufferSize() {
+ return replayBufferSize;
+ }
+
+ @Configure
+ protected void setReplayBufferSize(@Default("8") int size) {
+ replayBufferSize = size;
+ }
+
+ public Set<String> getOutboundResponseExcludeHeaders() {
+ return outboundResponseExcludeHeaders;
+ }
+}
http://git-wip-us.apache.org/repos/asf/knox/blob/016a47dc/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/GatewayDispatchFilter.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/GatewayDispatchFilter.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/GatewayDispatchFilter.java
index 897d5de..c23d9dd 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/GatewayDispatchFilter.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/dispatch/GatewayDispatchFilter.java
@@ -64,8 +64,10 @@ public class GatewayDispatchFilter extends AbstractGatewayFilter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
super.init(filterConfig);
- String dispatchImpl = filterConfig.getInitParameter("dispatch-impl");
- dispatch = newDispatch(dispatchImpl);
+ if (dispatch == null) {
+ String dispatchImpl = filterConfig.getInitParameter("dispatch-impl");
+ dispatch = newDispatch(dispatchImpl);
+ }
configuration().target(dispatch).source(filterConfig).inject();
httpClient = HttpClients.custom().setDefaultCookieStore(new NoCookieStore()).build();
//[sumit] this can perhaps be stashed in the servlet context to increase sharing of the client