You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by km...@apache.org on 2015/01/28 15:44:15 UTC

knox git commit: KNOX-492: Support service level replayBufferLimit for Ozzie, Hive and HBase.

Repository: knox
Updated Branches:
  refs/heads/master 8a2ae7d11 -> 7c4b44af6


KNOX-492: Support service level replayBufferLimit for Ozzie, Hive and HBase.


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/7c4b44af
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/7c4b44af
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/7c4b44af

Branch: refs/heads/master
Commit: 7c4b44af66d7fce69b854a1371ea2a4fbe51f637
Parents: 8a2ae7d
Author: Kevin Minder <ke...@hortonworks.com>
Authored: Wed Jan 28 09:43:31 2015 -0500
Committer: Kevin Minder <ke...@hortonworks.com>
Committed: Wed Jan 28 09:43:31 2015 -0500

----------------------------------------------------------------------
 .../impl/DispatchDeploymentContributor.java     |  22 +-
 .../HBaseDispatchDeploymentContributor.java     |  22 +-
 .../hive/HiveDispatchDeploymentContributor.java |  25 ++-
 .../oozie/OozieDeploymentContributor.java       |   7 +-
 .../deploy/DeploymentFactoryFuncTest.java       | 205 ++++++++++++++++++-
 5 files changed, 238 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
index e26d7b0..ab3448b 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/deploy/impl/DispatchDeploymentContributor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.gateway.topology.Provider;
 import org.apache.hadoop.gateway.topology.Service;
 
 import java.util.List;
+import java.util.Map;
 
 public class DispatchDeploymentContributor extends ProviderDeploymentContributorBase {
   
@@ -47,17 +48,22 @@ public class DispatchDeploymentContributor extends ProviderDeploymentContributor
 
   @Override
   public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
-    String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
-    if (params != null) {
-      for (FilterParamDescriptor paramDescriptor : params) {
-        if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
-          replayBufferSize = paramDescriptor.value();
-          break;
+    FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HttpClientDispatch.class );
+
+    FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
+    for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
+      if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
+        filterParam.value( serviceParam.getValue() );
+      }
+    }
+    if ( params != null ) {
+      for ( FilterParamDescriptor customParam : params ) {
+        if ( REPLAY_BUFFER_SIZE_PARAM.equals( customParam.name() ) ) {
+          filterParam.value( customParam.value() );
         }
       }
     }
-    FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HttpClientDispatch.class );
-    filter.param().name("replayBufferSize").value(replayBufferSize);
+
     if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
       filter.param().name("kerberos").value("true");
     }

http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
index 85a7911..d2b4ce0 100644
--- a/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
+++ b/gateway-service-hbase/src/main/java/org/apache/hadoop/gateway/hbase/HBaseDispatchDeploymentContributor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.gateway.topology.Provider;
 import org.apache.hadoop.gateway.topology.Service;
 
 import java.util.List;
+import java.util.Map;
 
 public class HBaseDispatchDeploymentContributor extends ProviderDeploymentContributorBase {
   
@@ -46,17 +47,22 @@ public class HBaseDispatchDeploymentContributor extends ProviderDeploymentContri
 
   @Override
   public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
-    String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
-    if (params != null) {
-      for (FilterParamDescriptor paramDescriptor : params) {
-        if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
-          replayBufferSize = paramDescriptor.value();
-          break;
+    FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HBaseHttpClientDispatch.class );
+
+    FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
+    for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
+      if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
+        filterParam.value( serviceParam.getValue() );
+      }
+    }
+    if ( params != null ) {
+      for ( FilterParamDescriptor customParam : params ) {
+        if ( REPLAY_BUFFER_SIZE_PARAM.equals( customParam.name() ) ) {
+          filterParam.value( customParam.value() );
         }
       }
     }
-    FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HBaseHttpClientDispatch.class );
-    filter.param().name("replayBufferSize").value(replayBufferSize);
+
     if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
       filter.param().name("kerberos").value("true");
     }

http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
index 135a08f..00b13d9 100644
--- a/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
+++ b/gateway-service-hive/src/main/java/org/apache/hadoop/gateway/hive/HiveDispatchDeploymentContributor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.gateway.hive;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.gateway.deploy.DeploymentContext;
 import org.apache.hadoop.gateway.deploy.ProviderDeploymentContributorBase;
@@ -49,21 +50,25 @@ public class HiveDispatchDeploymentContributor extends ProviderDeploymentContrib
 
   @Override
   public void contributeFilter( DeploymentContext context, Provider provider, Service service, ResourceDescriptor resource, List<FilterParamDescriptor> params ) {
-    String replayBufferSize = DEFAULT_REPLAY_BUFFER_SIZE;
-    if (params != null) {
-      for (FilterParamDescriptor paramDescriptor : params) {
-        if (REPLAY_BUFFER_SIZE_PARAM.equals( paramDescriptor.name() )) {
-          replayBufferSize = paramDescriptor.value();
-          break;
+    FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HiveHttpClientDispatch.class );
+
+    FilterParamDescriptor filterParam = filter.param().name( REPLAY_BUFFER_SIZE_PARAM ).value( DEFAULT_REPLAY_BUFFER_SIZE );
+    for ( Map.Entry<String,String> serviceParam : service.getParams().entrySet() ) {
+      if ( REPLAY_BUFFER_SIZE_PARAM.equals( serviceParam.getKey() ) ) {
+        filterParam.value( serviceParam.getValue() );
+      }
+    }
+    if ( params != null ) {
+      for ( FilterParamDescriptor customParam : params ) {
+        if ( REPLAY_BUFFER_SIZE_PARAM.equals( customParam.name() ) ) {
+          filterParam.value( customParam.value() );
         }
       }
     }
-    FilterDescriptor filter = resource.addFilter().name( getName() ).role( getRole() ).impl( HiveHttpClientDispatch.class );
-    filter.param().name("replayBufferSize").value(replayBufferSize);
+
     if( context.getGatewayConfig().isHadoopKerberosSecured() ) {
       filter.param().name("kerberos").value("true");
-    }
-    else {
+    } else {
       filter.param().name("basicAuthPreemptive").value("true");
     }
   }

http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
----------------------------------------------------------------------
diff --git a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
index 4d56faa..0e9ffae 100644
--- a/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
+++ b/gateway-service-oozie/src/main/java/org/apache/hadoop/gateway/oozie/OozieDeploymentContributor.java
@@ -106,12 +106,7 @@ public class OozieDeploymentContributor extends ServiceDeploymentContributorBase
 
   private void addDispatchFilter(DeploymentContext context, Service service,
       ResourceDescriptor resource) {
-    List<FilterParamDescriptor> filterParams = new ArrayList<FilterParamDescriptor>();
-    FilterParamDescriptor filterParamDescriptor = resource.createFilterParam();
-    filterParamDescriptor.name(REPLAY_BUFFER_SIZE_PARAM);
-    filterParamDescriptor.value( DEFAULT_REPLAY_BUFFER_SIZE );
-    filterParams.add(filterParamDescriptor);
-    context.contributeFilter(service, resource, "dispatch", "http-client", filterParams);
+    context.contributeFilter(service, resource, "dispatch", "http-client", null );
   }
 
   UrlRewriteRulesDescriptor loadRulesFromTemplate() throws IOException {

http://git-wip-us.apache.org/repos/asf/knox/blob/7c4b44af/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
index 4cb9355..fc699b1 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/deploy/DeploymentFactoryFuncTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.gateway.deploy;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.gateway.GatewayTestConfig;
 import org.apache.hadoop.gateway.config.GatewayConfig;
 import org.apache.hadoop.gateway.services.DefaultGatewayServices;
@@ -27,28 +28,41 @@ import org.apache.hadoop.gateway.topology.Service;
 import org.apache.hadoop.gateway.topology.Topology;
 import org.apache.hadoop.test.log.NoOpAppender;
 import org.apache.log4j.Appender;
-import org.jboss.shrinkwrap.api.exporter.ExplodedExporter;
 import org.jboss.shrinkwrap.api.spec.WebArchive;
 import org.junit.Test;
 import org.w3c.dom.Document;
+import org.w3c.dom.Node;
 import org.xml.sax.InputSource;
 import org.xml.sax.SAXException;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
 import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.StringWriter;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNot.not;
 import static org.hamcrest.xml.HasXPath.hasXPath;
 import static org.junit.Assert.fail;
 
@@ -356,21 +370,190 @@ public class DeploymentFactoryFuncTest {
    }
 
 
-   private Document parse( InputStream stream ) throws IOException, SAXException, ParserConfigurationException {
+  @Test
+  public void testDeploymentWithoutReplayBufferSize() throws Exception {
+    GatewayConfig config = new GatewayTestConfig();
+    File targetDir = new File(System.getProperty("user.dir"), "target");
+    File gatewayDir = new File(targetDir, "gateway-home-" + UUID.randomUUID());
+    gatewayDir.mkdirs();
+    ((GatewayTestConfig) config).setGatewayHomeDir(gatewayDir.getAbsolutePath());
+    File deployDir = new File(config.getGatewayDeploymentDir());
+    deployDir.mkdirs();
+
+    DefaultGatewayServices srvcs = new DefaultGatewayServices();
+    Map<String, String> options = new HashMap<String, String>();
+    options.put("persist-master", "false");
+    options.put("master", "password");
+    try {
+      DeploymentFactory.setGatewayServices(srvcs);
+      srvcs.init(config, options);
+    } catch (ServiceLifecycleException e) {
+      e.printStackTrace(); // I18N not required.
+    }
+
+    Service service;
+    Topology topology = new Topology();
+    topology.setName( "test-cluster" );
+
+    service = new Service();
+    service.setRole( "HIVE" );
+    service.setUrls( Arrays.asList( new String[]{ "http://hive-host:50001/" } ) );
+    topology.addService( service );
+
+    service = new Service();
+    service.setRole( "WEBHBASE" );
+    service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50002/" } ) );
+    topology.addService( service );
+
+    service = new Service();
+    service.setRole( "OOZIE" );
+    service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50003/" } ) );
+    topology.addService( service );
+
+    WebArchive war = DeploymentFactory.createDeployment( config, topology );
+    Document doc = parse( war.get( "WEB-INF/gateway.xml" ).getAsset().openStream() );
+    //dump( doc );
+
+    Node resourceNode, filterNode, paramNode;
+    String value;
+
+    resourceNode = node( doc, "gateway/resource[role/text()='HIVE']" );
+    assertThat( resourceNode, is(not(nullValue())));
+    filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+    assertThat( filterNode, is(not(nullValue())));
+    paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+    value = value( paramNode, "value/text()" );
+    assertThat( value, is( "8" ) ) ;
+
+    resourceNode = node( doc, "gateway/resource[role/text()='WEBHBASE']" );
+    assertThat( resourceNode, is(not(nullValue())));
+    filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+    assertThat( filterNode, is(not(nullValue())));
+    paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+    value = value( paramNode, "value/text()" );
+    assertThat( value, is( "8" ) ) ;
+
+    resourceNode = node( doc, "gateway/resource[role/text()='OOZIE']" );
+    assertThat( resourceNode, is(not(nullValue())));
+    filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+    assertThat( filterNode, is(not(nullValue())));
+    paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+    value = value( paramNode, "value/text()" );
+    assertThat( value, is( "8" ) ) ;
+
+    FileUtils.deleteQuietly( deployDir );
+  }
+
+  @Test
+  public void testDeploymentWithReplayBufferSize() throws Exception {
+    GatewayConfig config = new GatewayTestConfig();
+    File targetDir = new File(System.getProperty("user.dir"), "target");
+    File gatewayDir = new File(targetDir, "gateway-home-" + UUID.randomUUID());
+    gatewayDir.mkdirs();
+    ((GatewayTestConfig) config).setGatewayHomeDir(gatewayDir.getAbsolutePath());
+    File deployDir = new File(config.getGatewayDeploymentDir());
+    deployDir.mkdirs();
+
+    DefaultGatewayServices srvcs = new DefaultGatewayServices();
+    Map<String, String> options = new HashMap<String, String>();
+    options.put("persist-master", "false");
+    options.put("master", "password");
+    try {
+      DeploymentFactory.setGatewayServices(srvcs);
+      srvcs.init(config, options);
+    } catch (ServiceLifecycleException e) {
+      e.printStackTrace(); // I18N not required.
+    }
+
+    Service service;
+    Param param;
+    Topology topology = new Topology();
+    topology.setName( "test-cluster" );
+
+    service = new Service();
+    service.setRole( "HIVE" );
+    service.setUrls( Arrays.asList( new String[]{ "http://hive-host:50001/" } ) );
+    param = new Param();
+    param.setName( "replayBufferSize" );
+    param.setValue( "17" );
+    service.addParam( param );
+    topology.addService( service );
+
+    service = new Service();
+    service.setRole( "WEBHBASE" );
+    service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50002/" } ) );
+    param = new Param();
+    param.setName( "replayBufferSize" );
+    param.setValue( "33" );
+    service.addParam( param );
+    topology.addService( service );
+
+    service = new Service();
+    service.setRole( "OOZIE" );
+    service.setUrls( Arrays.asList( new String[]{ "http://hbase-host:50003/" } ) );
+    param = new Param();
+    param.setName( "replayBufferSize" );
+    param.setValue( "65" );
+    service.addParam( param );
+    topology.addService( service );
+
+    WebArchive war = DeploymentFactory.createDeployment( config, topology );
+    Document doc = parse( war.get( "WEB-INF/gateway.xml" ).getAsset().openStream() );
+    //dump( doc );
+
+    Node resourceNode, filterNode, paramNode;
+    String value;
+
+    resourceNode = node( doc, "gateway/resource[role/text()='HIVE']" );
+    assertThat( resourceNode, is(not(nullValue())));
+    filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+    assertThat( filterNode, is(not(nullValue())));
+    paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+    value = value( paramNode, "value/text()" );
+    assertThat( value, is( "17" ) ) ;
+
+    resourceNode = node( doc, "gateway/resource[role/text()='WEBHBASE']" );
+    assertThat( resourceNode, is(not(nullValue())));
+    filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+    assertThat( filterNode, is(not(nullValue())));
+    paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+    value = value( paramNode, "value/text()" );
+    assertThat( value, is( "33" ) ) ;
+
+    resourceNode = node( doc, "gateway/resource[role/text()='OOZIE']" );
+    assertThat( resourceNode, is(not(nullValue())));
+    filterNode = node( resourceNode, "filter[role/text()='dispatch']" );
+    assertThat( filterNode, is(not(nullValue())));
+    paramNode = node( filterNode, "param[name/text()='replayBufferSize']" );
+    value = value( paramNode, "value/text()" );
+    assertThat( value, is( "65" ) ) ;
+
+    FileUtils.deleteQuietly( deployDir );
+  }
+
+  private Document parse( InputStream stream ) throws IOException, SAXException, ParserConfigurationException {
     DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
     DocumentBuilder builder = factory.newDocumentBuilder();
     InputSource source = new InputSource( stream );
     return builder.parse( source );
   }
 
-//  private void dump( Document document ) throws TransformerException {
-//    Transformer transformer = TransformerFactory.newInstance().newTransformer();
-//    transformer.setOutputProperty( OutputKeys.INDENT, "yes" );
-//    StreamResult result = new StreamResult( new StringWriter() );
-//    DOMSource source = new DOMSource( document );
-//    transformer.transform( source, result );
-//    String xmlString = result.getWriter().toString();
-//    System.out.println( xmlString );
-//  }
+  private void dump( Document document ) throws TransformerException {
+    Transformer transformer = TransformerFactory.newInstance().newTransformer();
+    transformer.setOutputProperty( OutputKeys.INDENT, "yes" );
+    StreamResult result = new StreamResult( new StringWriter() );
+    DOMSource source = new DOMSource( document );
+    transformer.transform( source, result );
+    String xmlString = result.getWriter().toString();
+    System.out.println( xmlString );
+  }
+
+  private Node node( Node scope, String expression ) throws XPathExpressionException {
+    return (Node)XPathFactory.newInstance().newXPath().compile( expression ).evaluate( scope, XPathConstants.NODE );
+  }
+
+  private String value( Node scope, String expression ) throws XPathExpressionException {
+    return XPathFactory.newInstance().newXPath().compile( expression ).evaluate( scope );
+  }
 
 }