You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2015/01/28 15:44:15 UTC
knox git commit: KNOX-492: Support service level replayBufferLimit
for Ozzie, Hive and HBase.
Repository: knox
Updated Branches:
refs/heads/master 8a2ae7d11 -> 7c4b44af6
KNOX-492: Support service level replayBufferLimit for Ozzie, Hive and HBase.
Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/7c4b44af
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/7c4b44af
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/7c4b44af
Branch: refs/heads/master
Commit: 7c4b44af66d7fce69b854a1371ea2a4fbe51f637
Parents: 8a2ae7d
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Wed Jan 28 09:43:31 2015 -0500
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Wed Jan 28 09:43:31 2015 -0500
----------------------------------------------------------------------
.../impl/DispatchDeploymentContributor.java | 22 +-
.../HBaseDispatchDeploymentContributor.java | 22 +-
.../hive/HiveDispatchDeploymentContributor.java | 25 ++-
.../oozie/OozieDeploymentContributor.java | 7 +-
.../deploy/DeploymentFactoryFuncTest.java | 205 ++++++++++++++++++-
5 files changed, 238 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
index e26d7b0..ab3448b 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.gateway.topology.Provider;
import org.apache.hadoop.gateway.topology.Service;
import java.util.List;
+import java.util.Map;
public class DispatchDeploymentContributor extends ProviderDeploymentContributorBase {
@@ -47,17 +48,22 @@ public class DispatchDeploymentContributor extends ProviderDeploymentContributor
@Override
public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
- String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
- if (params != null) {
- for (FilterParamDescriptor paramDescriptor : params) {
- if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
- replayBufferSize = paramDescriptor.value();
- break;
+ FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HttpClientDispatch.class );
+
+ FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
+ for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
+ if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
+ filterParam.value( serviceParam.getValue() );
+ }
+ }
+ if ( params != null ) {
+ for ( FilterParamDescriptor customParam : params ) {
+ if ( REPLAY_BUFFER_SIZE_PARAM.equals( customParam.name() ) ) {
+ filterParam.value( customParam.value() );
}
}
}
- FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HttpClientDispatch.class );
- filter.param().name("replayBufferSize").value(replayBufferSize);
+
if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
filter.param().name("kerberos").value("true");
}
http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
index 85a7911..d2b4ce0 100644
--- a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
+++ b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.gateway.topology.Provider;
import org.apache.hadoop.gateway.topology.Service;
import java.util.List;
+import java.util.Map;
public class HBaseDispatchDeploymentContributor extends ProviderDeploymentContributorBase {
@@ -46,17 +47,22 @@ public class HBaseDispatchDeploymentContributor extends ProviderDeploymentContri
@Override
public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
- String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
- if (params != null) {
- for (FilterParamDescriptor paramDescriptor : params) {
- if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
- replayBufferSize = paramDescriptor.value();
- break;
+ FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HBaseHttpClientDispatch.class );
+
+ FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
+ for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
+ if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
+ filterParam.value( serviceParam.getValue() );
+ }
+ }
+ if ( params != null ) {
+ for ( FilterParamDescriptor customParam : params ) {
+ if ( REPLAY_BUFFER_SIZE_PARAM.equals( customParam.name() ) ) {
+ filterParam.value( customParam.value() );
}
}
}
- FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HBaseHttpClientDispatch.class );
- filter.param().name("replayBufferSize").value(replayBufferSize);
+
if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
filter.param().name("kerberos").value("true");
}
http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
index 135a08f..00b13d9 100644
--- a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
+++ b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.gateway.hive;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.gateway.deploy.DeploymentContext;
import org.apache.hadoop.gateway.deploy.ProviderDeploymentContributorBase;
@@ -49,21 +50,25 @@ public class HiveDispatchDeploymentContributor extends ProviderDeploymentContrib
@Override
public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
- String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
- if (params != null) {
- for (FilterParamDescriptor paramDescriptor : params) {
- if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
- replayBufferSize = paramDescriptor.value();
- break;
+ FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HiveHttpClientDispatch.class );
+
+ FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
+ for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
+ if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
+ filterParam.value( serviceParam.getValue() );
+ }
+ }
+ if ( params != null ) {
+ for ( FilterParamDescriptor customParam : params ) {
+ if ( REPLAY_BUFFER_SIZE_PARAM.equals( customParam.name() ) ) {
+ filterParam.value( customParam.value() );
}
}
}
- FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HiveHttpClientDispatch.class );
- filter.param().name("replayBufferSize").value(replayBufferSize);
+
if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
filter.param().name("kerberos").value("true");
- }
- else {
+ } else {
filter.param().name("basicAuthPreemptive").value("true");
}
}
http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
index 4d56faa..0e9ffae 100644
--- a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
+++ b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
@@ -106,12 +106,7 @@ public class OozieDeploymentContributor extends ServiceDeploymentContributorBase
private void addDispatchFilter(DeploymentContext context, Service service,
ResourceDescriptor resource) {
- List<FilterParamDescriptor> filterParams = new ArrayList<FilterParamDescriptor>();
- FilterParamDescriptor filterParamDescriptor = resource.createFilterParam();
- filterParamDescriptor.name(REPLAY_BUFFER_SIZE_PARAM);
- filterParamDescriptor.value( DEFAULT_REPLAY_BUFFER_SIZE );
- filterParams.add(filterParamDescriptor);
- context.contributeFilter(service, resource, "dispatch", "http-client", filterParams);
+ context.contributeFilter(service, resource, "dispatch", "http-client", null );
}
UrlRewriteRulesDescriptor loadRulesFromTemplate() throws IOException {
http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
index 4cb9355..fc699b1 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.gateway.deploy;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.gateway.GatewayTestConfig;
import org.apache.hadoop.gateway.config.GatewayConfig;
import org.apache.hadoop.gateway.services.DefaultGatewayServices;
@@ -27,28 +28,41 @@ import org.apache.hadoop.gateway.topology.Service;
import org.apache.hadoop.gateway.topology.Topology;
import org.apache.hadoop.test.log.NoOpAppender;
import org.apache.log4j.Appender;
-import org.jboss.shrinkwrap.api.exporter.ExplodedExporter;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.junit.Test;
import org.w3c.dom.Document;
+import org.w3c.dom.Node;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.StringWriter;
import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.xml.HasXPath.hasXPath;
import static org.junit.Assert.fail;
@@ -356,21 +370,190 @@ public class DeploymentFactoryFuncTest {
}
- private Document parse( InputStream stream ) throws IOException, SAXException, ParserConfigurationException {
+ @Test
+ public void testDeploymentWithoutReplayBufferSize() throws Exception {
+ GatewayConfig config = new GatewayTestConfig();
+ File targetDir = new File(System.getProperty("user.dir"), "target");
+ File gatewayDir = new File(targetDir, "gateway-home-" + UUID.randomUUID());
+ gatewayDir.mkdirs();
+ ((GatewayTestConfig) config).setGatewayHomeDir(gatewayDir.getAbsolutePath());
+ File deployDir = new File(config.getGatewayDeploymentDir());
+ deployDir.mkdirs();
+
+ DefaultGatewayServices srvcs = new DefaultGatewayServices();
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("persist-master", "false");
+ options.put("master", "password");
+ try {
+ DeploymentFactory.setGatewayServices(srvcs);
+ srvcs.init(config, options);
+ } catch (ServiceLifecycleException e) {
+ e.printStackTrace(); // I18N not required.
+ }
+
+ Service service;
+ Topology topology = new Topology();
+ topology.setName( "test-cluster" );
+
+ service = new Service();
+ service.setRole( "HIVE" );
+ service.setUrls( Arrays.asList( new String[]{ "http://hive-host:50001/" } ) );
+ topology.addService( service );
+
+ service = new Service();
+ service.setRole( "WEBHBASE" );
+ service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50002/" } ) );
+ topology.addService( service );
+
+ service = new Service();
+ service.setRole( "OOZIE" );
+ service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50003/" } ) );
+ topology.addService( service );
+
+ WebArchive war = DeploymentFactory.createDeployment( config, topology );
+ Document doc = parse( war.get( "WEB-INF/gateway.xml" ).getAsset().openStream() );
+ //dump( doc );
+
+ Node resourceNode, filterNode, paramNode;
+ String value;
+
+ resourceNode = node( doc, "gateway/resource[role/text()='HIVE']" );
+ assertThat( resourceNode, is(not(nullValue())));
+ filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+ assertThat( filterNode, is(not(nullValue())));
+ paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+ value = value( paramNode, "value/text()" );
+ assertThat( value, is( "8" ) ) ;
+
+ resourceNode = node( doc, "gateway/resource[role/text()='WEBHBASE']" );
+ assertThat( resourceNode, is(not(nullValue())));
+ filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+ assertThat( filterNode, is(not(nullValue())));
+ paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+ value = value( paramNode, "value/text()" );
+ assertThat( value, is( "8" ) ) ;
+
+ resourceNode = node( doc, "gateway/resource[role/text()='OOZIE']" );
+ assertThat( resourceNode, is(not(nullValue())));
+ filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+ assertThat( filterNode, is(not(nullValue())));
+ paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+ value = value( paramNode, "value/text()" );
+ assertThat( value, is( "8" ) ) ;
+
+ FileUtils.deleteQuietly( deployDir );
+ }
+
+ @Test
+ public void testDeploymentWithReplayBufferSize() throws Exception {
+ GatewayConfig config = new GatewayTestConfig();
+ File targetDir = new File(System.getProperty("user.dir"), "target");
+ File gatewayDir = new File(targetDir, "gateway-home-" + UUID.randomUUID());
+ gatewayDir.mkdirs();
+ ((GatewayTestConfig) config).setGatewayHomeDir(gatewayDir.getAbsolutePath());
+ File deployDir = new File(config.getGatewayDeploymentDir());
+ deployDir.mkdirs();
+
+ DefaultGatewayServices srvcs = new DefaultGatewayServices();
+ Map<String, String> options = new HashMap<String, String>();
+ options.put("persist-master", "false");
+ options.put("master", "password");
+ try {
+ DeploymentFactory.setGatewayServices(srvcs);
+ srvcs.init(config, options);
+ } catch (ServiceLifecycleException e) {
+ e.printStackTrace(); // I18N not required.
+ }
+
+ Service service;
+ Param param;
+ Topology topology = new Topology();
+ topology.setName( "test-cluster" );
+
+ service = new Service();
+ service.setRole( "HIVE" );
+ service.setUrls( Arrays.asList( new String[]{ "http://hive-host:50001/" } ) );
+ param = new Param();
+ param.setName( "replayBufferSize" );
+ param.setValue( "17" );
+ service.addParam( param );
+ topology.addService( service );
+
+ service = new Service();
+ service.setRole( "WEBHBASE" );
+ service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50002/" } ) );
+ param = new Param();
+ param.setName( "replayBufferSize" );
+ param.setValue( "33" );
+ service.addParam( param );
+ topology.addService( service );
+
+ service = new Service();
+ service.setRole( "OOZIE" );
+ service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50003/" } ) );
+ param = new Param();
+ param.setName( "replayBufferSize" );
+ param.setValue( "65" );
+ service.addParam( param );
+ topology.addService( service );
+
+ WebArchive war = DeploymentFactory.createDeployment( config, topology );
+ Document doc = parse( war.get( "WEB-INF/gateway.xml" ).getAsset().openStream() );
+ //dump( doc );
+
+ Node resourceNode, filterNode, paramNode;
+ String value;
+
+ resourceNode = node( doc, "gateway/resource[role/text()='HIVE']" );
+ assertThat( resourceNode, is(not(nullValue())));
+ filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+ assertThat( filterNode, is(not(nullValue())));
+ paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+ value = value( paramNode, "value/text()" );
+ assertThat( value, is( "17" ) ) ;
+
+ resourceNode = node( doc, "gateway/resource[role/text()='WEBHBASE']" );
+ assertThat( resourceNode, is(not(nullValue())));
+ filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+ assertThat( filterNode, is(not(nullValue())));
+ paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+ value = value( paramNode, "value/text()" );
+ assertThat( value, is( "33" ) ) ;
+
+ resourceNode = node( doc, "gateway/resource[role/text()='OOZIE']" );
+ assertThat( resourceNode, is(not(nullValue())));
+ filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+ assertThat( filterNode, is(not(nullValue())));
+ paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+ value = value( paramNode, "value/text()" );
+ assertThat( value, is( "65" ) ) ;
+
+ FileUtils.deleteQuietly( deployDir );
+ }
+
+ private Document parse( InputStream stream ) throws IOException, SAXException, ParserConfigurationException {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
InputSource source = new InputSource( stream );
return builder.parse( source );
}
-// private void dump( Document document ) throws TransformerException {
-// Transformer transformer = TransformerFactory.newInstance().newTransformer();
-// transformer.setOutputProperty( OutputKeys.INDENT, "yes" );
-// StreamResult result = new StreamResult( new StringWriter() );
-// DOMSource source = new DOMSource( document );
-// transformer.transform( source, result );
-// String xmlString = result.getWriter().toString();
-// System.out.println( xmlString );
-// }
+ private void dump( Document document ) throws TransformerException {
+ Transformer transformer = TransformerFactory.newInstance().newTransformer();
+ transformer.setOutputProperty( OutputKeys.INDENT, "yes" );
+ StreamResult result = new StreamResult( new StringWriter() );
+ DOMSource source = new DOMSource( document );
+ transformer.transform( source, result );
+ String xmlString = result.getWriter().toString();
+ System.out.println( xmlString );
+ }
+
+ private Node node( Node scope, String expression ) throws XPathExpressionException {
+ return (Node)XPathFactory.newInstance().newXPath().compile( expression ).evaluate( scope, XPathConstants.NODE );
+ }
+
+ private String value( Node scope, String expression ) throws XPathExpressionException {
+ return XPathFactory.newInstance().newXPath().compile( expression ).evaluate( scope );
+ }
}