You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:31 UTC
[52/60] [abbrv] storm git commit: removed jstorm-on-yarn subdirectory
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/AuthUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/AuthUtils.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/AuthUtils.java
deleted file mode 100644
index b7eecfd..0000000
--- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/AuthUtils.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.alibaba.jstorm.yarn.thrift;
-
-import backtype.storm.Config;
-import javax.security.auth.login.Configuration;
-import javax.security.auth.login.AppConfigurationEntry;
-import java.security.NoSuchAlgorithmException;
-import java.security.URIParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Map;
-
-public class AuthUtils {
- private static final Logger LOG = LoggerFactory.getLogger(AuthUtils.class);
- public static final String LOGIN_CONTEXT_SERVER = "StormServer";
- public static final String LOGIN_CONTEXT_CLIENT = "StormClient";
- public static final String SERVICE = "storm_thrift_server";
-
- /**
- * Construct a JAAS configuration object per storm configuration file
- * @param storm_conf Storm configuration
- * @return JAAS configuration object
- */
- public static Configuration GetConfiguration(Map storm_conf) {
- Configuration login_conf = null;
-
- //find login file configuration from Storm configuration
- String loginConfigurationFile = (String)storm_conf.get("java.security.auth.login.config");
- if ((loginConfigurationFile != null) && (loginConfigurationFile.length()>0)) {
- try {
- URI config_uri = new File(loginConfigurationFile).toURI();
- login_conf = Configuration.getInstance("JavaLoginConfig", new URIParameter(config_uri));
- } catch (NoSuchAlgorithmException ex1) {
- if (ex1.getCause() instanceof FileNotFoundException)
- throw new RuntimeException("configuration file "+loginConfigurationFile+" could not be found");
- else throw new RuntimeException(ex1);
- } catch (Exception ex2) {
- throw new RuntimeException(ex2);
- }
- }
-
- return login_conf;
- }
-
- /**
- * Construct a transport plugin per storm configuration
- * @param conf storm configuration
- * @return
- */
- public static ITransportPlugin GetTransportPlugin(Map storm_conf, Configuration login_conf) {
- ITransportPlugin transportPlugin = null;
- try {
- String transport_plugin_klassName = (String) storm_conf.get(Config.STORM_THRIFT_TRANSPORT_PLUGIN);
- Class klass = Class.forName(transport_plugin_klassName);
- transportPlugin = (ITransportPlugin)klass.newInstance();
- transportPlugin.prepare(storm_conf, login_conf);
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- return transportPlugin;
- }
-
- public static String get(Configuration configuration, String section, String key) throws IOException {
- AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section);
- if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+ section + "' entry in this configuration.";
- throw new IOException(errorMessage);
- }
-
- for(AppConfigurationEntry entry: configurationEntries) {
- Object val = entry.getOptions().get(key);
- if (val != null)
- return (String)val;
- }
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ITransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ITransportPlugin.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ITransportPlugin.java
deleted file mode 100644
index 0c06bf1..0000000
--- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ITransportPlugin.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.alibaba.jstorm.yarn.thrift;
-
-import java.io.IOException;
-import java.util.Map;
-
-import javax.security.auth.login.Configuration;
-
-import org.apache.thrift7.TProcessor;
-import org.apache.thrift7.server.TServer;
-import org.apache.thrift7.transport.TTransport;
-import org.apache.thrift7.transport.TTransportException;
-
-/**
- * Interface for Thrift Transport plugin
- */
-public interface ITransportPlugin {
- /**
- * Invoked once immediately after construction
- * @param storm_conf Storm configuration
- * @param login_conf login configuration
- */
- void prepare(Map storm_conf, Configuration login_conf);
-
- /**
- * Create a server associated with a given port and service handler
- * @param port listening port
- * @param processor service handler
- * @return server to be binded
- */
- public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException;
-
- /**
- * Connect to the specified server via framed transport
- * @param transport The underlying Thrift transport.
- * @param serverHost server host
- */
- public TTransport connect(TTransport transport, String serverHost) throws IOException, TTransportException;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ReqContext.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ReqContext.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ReqContext.java
deleted file mode 100644
index ae10096..0000000
--- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ReqContext.java
+++ /dev/null
@@ -1,91 +0,0 @@
-package com.alibaba.jstorm.yarn.thrift;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.net.InetAddress;
-import com.google.common.annotations.VisibleForTesting;
-import java.security.AccessControlContext;
-import java.security.AccessController;
-import java.security.Principal;
-import javax.security.auth.Subject;
-
-/**
- * context request context includes info about
- * (1) remote address,
- * (2) remote subject and primary principal
- * (3) request ID
- */
-public class ReqContext {
- private static final AtomicInteger uniqueId = new AtomicInteger(0);
- private Subject _subject;
- private InetAddress _remoteAddr;
- private Integer _reqID;
- private Map _storm_conf;
-
- /**
- * Get a request context associated with current thread
- * @return
- */
- public static ReqContext context() {
- return ctxt.get();
- }
-
- //each thread will have its own request context
- private static final ThreadLocal < ReqContext > ctxt =
- new ThreadLocal < ReqContext > () {
- @Override
- protected ReqContext initialValue() {
- return new ReqContext(AccessController.getContext());
- }
- };
-
- //private constructor
- @VisibleForTesting
- ReqContext(AccessControlContext acl_ctxt) {
- _subject = Subject.getSubject(acl_ctxt);
- _reqID = uniqueId.incrementAndGet();
- }
-
- /**
- * client address
- */
- public void setRemoteAddress(InetAddress addr) {
- _remoteAddr = addr;
- }
-
- public InetAddress remoteAddress() {
- return _remoteAddr;
- }
-
- /**
- * Set remote subject explicitly
- */
- public void setSubject(Subject subject) {
- _subject = subject;
- }
-
- /**
- * Retrieve client subject associated with this request context
- */
- public Subject subject() {
- return _subject;
- }
-
- /**
- * The primary principal associated current subject
- */
- public Principal principal() {
- if (_subject == null) return null;
- Set<Principal> princs = _subject.getPrincipals();
- if (princs.size()==0) return null;
- return (Principal) (princs.toArray()[0]);
- }
-
- /**
- * request ID of this request
- */
- public Integer requestID() {
- return _reqID;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/SimpleTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/SimpleTransportPlugin.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/SimpleTransportPlugin.java
deleted file mode 100644
index 411a5ff..0000000
--- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/SimpleTransportPlugin.java
+++ /dev/null
@@ -1,107 +0,0 @@
-package com.alibaba.jstorm.yarn.thrift;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-import java.util.Map;
-
-import javax.security.auth.login.Configuration;
-import org.apache.thrift7.TException;
-import org.apache.thrift7.TProcessor;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.protocol.TProtocol;
-import org.apache.thrift7.server.THsHaServer;
-import org.apache.thrift7.server.TServer;
-import org.apache.thrift7.transport.TFramedTransport;
-import org.apache.thrift7.transport.TMemoryInputTransport;
-import org.apache.thrift7.transport.TNonblockingServerSocket;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-import org.apache.thrift7.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple transport for Thrift plugin.
- *
- * This plugin is designed to be backward compatible with existing Storm code.
- */
-public class SimpleTransportPlugin implements ITransportPlugin {
- protected Configuration login_conf;
- private static final Logger LOG = LoggerFactory.getLogger(SimpleTransportPlugin.class);
-
- /**
- * Invoked once immediately after construction
- * @param conf Storm configuration
- * @param login_conf login configuration
- */
- public void prepare(Map storm_conf, Configuration login_conf) {
- this.login_conf = login_conf;
- }
-
- /**
- * We will let Thrift to apply default transport factory
- */
- public TServer getServer(int port, TProcessor processor) throws IOException, TTransportException {
- TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
- THsHaServer.Args server_args = new THsHaServer.Args(serverTransport).
- processor(new SimpleWrapProcessor(processor)).
- workerThreads(64).
- protocolFactory(new TBinaryProtocol.Factory());
-
- //construct THsHaServer
- return new THsHaServer(server_args);
- }
-
- /**
- * Connect to the specified server via framed transport
- * @param transport The underlying Thrift transport.
- */
- public TTransport connect(TTransport transport, String serverHost) throws TTransportException {
- //create a framed transport
- TTransport conn = new TFramedTransport(transport);
-
- //connect
- conn.open();
- LOG.debug("Simple client transport has been established");
-
- return conn;
- }
-
- /**
- * Processor that populate simple transport info into ReqContext, and then invoke a service handler
- */
- private class SimpleWrapProcessor implements TProcessor {
- final TProcessor wrapped;
-
- SimpleWrapProcessor(TProcessor wrapped) {
- this.wrapped = wrapped;
- }
-
- public boolean process(final TProtocol inProt, final TProtocol outProt) throws TException {
- //populating request context
- ReqContext req_context = ReqContext.context();
-
- TTransport trans = inProt.getTransport();
- if (trans instanceof TMemoryInputTransport) {
- try {
- req_context.setRemoteAddress(InetAddress.getLocalHost());
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- } else if (trans instanceof TSocket) {
- TSocket tsocket = (TSocket)trans;
- //remote address
- Socket socket = tsocket.getSocket();
- req_context.setRemoteAddress(socket.getInetAddress());
- }
-
- //anonymous user
- req_context.setSubject(null);
-
- //invoke service handler
- return wrapped.process(inProt, outProt);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftClient.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftClient.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftClient.java
deleted file mode 100644
index 940b16d..0000000
--- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftClient.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package com.alibaba.jstorm.yarn.thrift;
-
-import java.io.IOException;
-import java.util.Map;
-import javax.security.auth.login.Configuration;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.protocol.TProtocol;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-import org.apache.thrift7.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import backtype.storm.utils.Utils;
-
-public class ThriftClient {
- private static final Logger LOG = LoggerFactory.getLogger(ThriftClient.class);
- private TTransport _transport;
- protected TProtocol _protocol;
-
- public ThriftClient(Map storm_conf, String host, int port) throws TTransportException {
- this(storm_conf, host, port, null);
- }
-
- public ThriftClient(Map storm_conf, String host, int port, Integer timeout) throws TTransportException {
- try {
- //locate login configuration
- Configuration login_conf = AuthUtils.GetConfiguration(storm_conf);
-
- //construct a transport plugin
- ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(storm_conf, login_conf);
-
- //create a socket with server
- if(host==null) {
- throw new IllegalArgumentException("host is not set");
- }
- if(port<=0) {
- throw new IllegalArgumentException("invalid port: "+port);
- }
- TSocket socket = new TSocket(host, port);
- if(timeout!=null) {
- socket.setTimeout(timeout);
- }
- final TTransport underlyingTransport = socket;
-
- //establish client-server transport via plugin
- _transport = transportPlugin.connect(underlyingTransport, host);
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- _protocol = null;
- if (_transport != null)
- _protocol = new TBinaryProtocol(_transport);
- }
-
- public TTransport transport() {
- return _transport;
- }
-
- public void close() {
- _transport.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftServer.java b/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftServer.java
deleted file mode 100644
index 1ab4311..0000000
--- a/jstorm-on-yarn/src/main/java/com/alibaba/jstorm/yarn/thrift/ThriftServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package com.alibaba.jstorm.yarn.thrift;
-
-
-import java.util.Map;
-
-import javax.security.auth.login.Configuration;
-
-import org.apache.thrift7.TProcessor;
-import org.apache.thrift7.server.TServer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThriftServer {
- private static final Logger LOG = LoggerFactory
- .getLogger(ThriftServer.class);
- private Map _storm_conf; // storm configuration
- protected TProcessor _processor = null;
- private int _port = 0;
- private TServer _server = null;
- private Configuration _login_conf;
-
- public ThriftServer(Map storm_conf, TProcessor processor, int port) {
- try {
- _storm_conf = storm_conf;
- _processor = processor;
- _port = port;
-
- // retrieve authentication configuration
- _login_conf = AuthUtils.GetConfiguration(_storm_conf);
- } catch (Exception x) {
- LOG.error(x.getMessage(), x);
- }
- }
-
- public void stop() {
- if (_server != null)
- _server.stop();
- }
-
- /**
- * Is ThriftServer listening to requests?
- *
- * @return
- */
- public boolean isServing() {
- if (_server == null)
- return false;
- return _server.isServing();
- }
-
- public void serve() {
- try {
- // locate our thrift transport plugin
- ITransportPlugin transportPlugin = AuthUtils.GetTransportPlugin(
- _storm_conf, _login_conf);
-
- // server
- _server = transportPlugin.getServer(_port, _processor);
-
- // start accepting requests
- _server.serve();
- } catch (Exception ex) {
- LOG.error("ThriftServer is being stopped due to: " + ex, ex);
- if (_server != null)
- _server.stop();
- Runtime.getRuntime().halt(1); // shutdown server process since we
- // could not handle Thrift requests
- // any more
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/resources/logback.xml b/jstorm-on-yarn/src/main/resources/logback.xml
deleted file mode 100644
index ac732b9..0000000
--- a/jstorm-on-yarn/src/main/resources/logback.xml
+++ /dev/null
@@ -1,46 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<configuration scan="true" scanPeriod="60 seconds">
- <appender name="A1" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>${logfile.name}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>${logfile.name}.%i</fileNamePattern>
- <minIndex>1</minIndex>
- <maxIndex>9</maxIndex>
- </rollingPolicy>
-
- <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>100MB</maxFileSize>
- </triggeringPolicy>
-
- <encoder>
- <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
- </encoder>
- </appender>
-
- <appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>access.log</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>access.log.%i</fileNamePattern>
- <minIndex>1</minIndex>
- <maxIndex>9</maxIndex>
- </rollingPolicy>
-
- <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
- <maxFileSize>100MB</maxFileSize>
- </triggeringPolicy>
-
- <encoder>
- <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
- </encoder>
- </appender>
-
- <root level="INFO">
- <appender-ref ref="A1"/>
- </root>
-
- <logger name="backtype.storm.security.auth.authorizer" additivity="false">
- <level value="INFO" />
- <appender-ref ref="ACCESS" />
- </logger>
-</configuration>
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/main/resources/master_defaults.yaml
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/main/resources/master_defaults.yaml b/jstorm-on-yarn/src/main/resources/master_defaults.yaml
deleted file mode 100644
index 314ba23..0000000
--- a/jstorm-on-yarn/src/main/resources/master_defaults.yaml
+++ /dev/null
@@ -1,42 +0,0 @@
-# Copyright (c) 2013 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License. See accompanying LICENSE file.
-#
-master.host: "localhost"
-master.thrift.port: 9000
-master.initial-num-supervisors: 1
-master.container.priority: 0
-master.container.size-mb: 5120
-master.heartbeat.interval.millis: 1000
-master.timeout.secs: 1000
-yarn.report.wait.millis: 10000
-nimbusui.startup.ms: 10000
-
-ui.port: 7070
-
-storm.messaging.transport: "backtype.storm.messaging.netty.Context"
-storm.messaging.netty.buffer_size: 1048576
-storm.messaging.netty.max_retries: 100
-storm.messaging.netty.min_wait_ms: 1000
-storm.messaging.netty.max_wait_ms: 5000
-
-# Configuration parameter that allows the launching machine to specify the JAVA_JOME
-# used when the application is executed on the YARN cluster.
-#
-# storm.yarn.java_home: "/vol/anarchy/boneill/jdk1.7.0_40"
-
-# Configuration parameter that allows the launching machine to specify the yarn classpath
-# used when the application is executed on the YARN cluster. To find this value, run
-# "yarn classpath" on the target machines.
-#
-# storm.yarn.yarn_classpath: "/home/boneill/hadoop/etc/hadoop:/home/boneill/hadoop/etc/hadoop:/home/boneill/hadoop/etc/hadoop:/home/boneill/hadoop/share/hadoop/common/lib/*:/home/boneill/hadoop/share/hadoop/common/*:/home/boneill/hadoop/share/hadoop/hdfs:/home/boneill/hadoop/share/hadoop/hdfs/lib/*:/home/boneill/hadoop/share/hadoop/hdfs/*:/home/boneill/hadoop/share/hadoop/yarn/lib/*:/home/boneill/hadoop/share/hadoop/yarn/*:/home/boneill/hadoop/share/hadoop/mapreduce/lib/*:/home/boneill/hadoop/share/hadoop/mapreduce/*:/Users/bone/tools/hadoop/contrib/capacity-scheduler/*.jar:/home/boneill/hadoop/share/hadoop/yarn/*:/home/boneill/hadoop/share/hadoop/yarn/lib/*"
http://git-wip-us.apache.org/repos/asf/storm/blob/e1f68448/jstorm-on-yarn/src/test/java/com/taobao/jstorm/yarn/AppTest.java
----------------------------------------------------------------------
diff --git a/jstorm-on-yarn/src/test/java/com/taobao/jstorm/yarn/AppTest.java b/jstorm-on-yarn/src/test/java/com/taobao/jstorm/yarn/AppTest.java
deleted file mode 100644
index ec21b4b..0000000
--- a/jstorm-on-yarn/src/test/java/com/taobao/jstorm/yarn/AppTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.taobao.jstorm.yarn;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest
- extends TestCase
-{
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest( String testName )
- {
- super( testName );
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite()
- {
- return new TestSuite( AppTest.class );
- }
-
- /**
- * Rigourous Test :-)
- */
- public void testApp()
- {
- assertTrue( true );
- }
-}