You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/08/02 08:02:57 UTC
git commit: FLUME-2109. HTTPS support in HTTP Source.
Updated Branches:
refs/heads/trunk 5b5470bd5 -> e25661041
FLUME-2109. HTTPS support in HTTP Source.
(Ashish Paliwal via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e2566104
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e2566104
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e2566104
Branch: refs/heads/trunk
Commit: e25661041c2d478bf27d64d39241a9fce9a0d263
Parents: 5b5470b
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Aug 1 22:46:38 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Aug 1 22:46:38 2013 -0700
----------------------------------------------------------------------
.../apache/flume/source/http/HTTPSource.java | 56 +++++++++-
.../http/HTTPSourceConfigurationConstants.java | 5 +
.../flume/source/http/TestHTTPSource.java | 110 ++++++++++++++++++-
flume-ng-core/src/test/resources/jettykeystore | Bin 0 -> 1355 bytes
flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 +
5 files changed, 165 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index c90f067..84ee33b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -29,6 +29,7 @@ import org.apache.flume.source.AbstractSource;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,16 +89,46 @@ public class HTTPSource extends AbstractSource implements
private HTTPSourceHandler handler;
private SourceCounter sourceCounter;
+ // SSL configuration variable
+ private volatile Integer sslPort;
+ private volatile String keyStorePath;
+ private volatile String keyStorePassword;
+ private volatile Boolean sslEnabled;
+
+
@Override
public void configure(Context context) {
try {
+ // SSL related config
+ sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false);
+
port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
HTTPSourceConfigurationConstants.DEFAULT_BIND);
- checkHostAndPort();
+
+ Preconditions.checkState(host != null && !host.isEmpty(),
+ "HTTPSource hostname specified is empty");
+ // verify port only if its not ssl
+ if(!sslEnabled) {
+ Preconditions.checkNotNull(port, "HTTPSource requires a port number to be"
+ + " specified");
+ }
+
String handlerClassName = context.getString(
HTTPSourceConfigurationConstants.CONFIG_HANDLER,
HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();
+
+ if(sslEnabled) {
+ LOG.debug("SSL configuration enabled");
+ sslPort = context.getInteger(HTTPSourceConfigurationConstants.SSL_PORT);
+ Preconditions.checkArgument(sslPort != null && sslPort > 0, "SSL Port cannot be null or less than 0" );
+ keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE);
+ Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),
+ "Keystore is required for SSL Conifguration" );
+ keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
+ Preconditions.checkArgument(keyStorePassword != null, "Keystore password is required for SSL Configuration");
+ }
+
@SuppressWarnings("unchecked")
Class<? extends HTTPSourceHandler> clazz =
(Class<? extends HTTPSourceHandler>)
@@ -139,10 +170,25 @@ public class HTTPSource extends AbstractSource implements
+ " before I started one."
+ "Will not attempt to start.");
srv = new Server();
- SocketConnector connector = new SocketConnector();
- connector.setPort(port);
- connector.setHost(host);
- srv.setConnectors(new Connector[] { connector });
+
+ // Connector Array
+ Connector[] connectors = new Connector[1];
+
+
+ if(sslEnabled) {
+ SslSocketConnector sslSocketConnector = new SslSocketConnector();
+ sslSocketConnector.setKeystore(keyStorePath);
+ sslSocketConnector.setKeyPassword(keyStorePassword);
+ sslSocketConnector.setPort(sslPort);
+ connectors[0] = sslSocketConnector;
+ } else {
+ SocketConnector connector = new SocketConnector();
+ connector.setPort(port);
+ connector.setHost(host);
+ connectors[0] = connector;
+ }
+
+ srv.setConnectors(connectors);
try {
org.mortbay.jetty.servlet.Context root =
new org.mortbay.jetty.servlet.Context(
http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
index f547e0f..205aeab 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
@@ -34,4 +34,9 @@ public class HTTPSourceConfigurationConstants {
public static final String DEFAULT_HANDLER =
"org.apache.flume.source.http.JSONHandler";
+ public static final String SSL_PORT = "sslPort";
+ public static final String SSL_KEYSTORE = "keystore";
+ public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword";
+ public static final String SSL_ENABLED = "enableSSL";
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
index 8952db3..6c9fd86 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -22,11 +22,7 @@ import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import junit.framework.Assert;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
+import org.apache.flume.*;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
@@ -34,6 +30,7 @@ import org.apache.flume.conf.Configurables;
import org.apache.flume.event.JSONEvent;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.junit.AfterClass;
@@ -41,10 +38,14 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.net.ssl.*;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.ServerSocket;
+import java.net.URL;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -58,8 +59,12 @@ import static org.fest.reflect.core.Reflection.field;
public class TestHTTPSource {
private static HTTPSource source;
+ private static HTTPSource httpsSource;
+// private static Channel httpsChannel;
+
private static Channel channel;
private static int selectedPort;
+ private static int sslPort;
DefaultHttpClient httpClient;
HttpPost postRequest;
@@ -77,9 +82,13 @@ public class TestHTTPSource {
source = new HTTPSource();
channel = new MemoryChannel();
+ httpsSource = new HTTPSource();
+// httpsChannel = new MemoryChannel();
+
Context ctx = new Context();
ctx.put("capacity", "100");
Configurables.configure(channel, ctx);
+// Configurables.configure(httpsChannel, ctx);
List<Channel> channels = new ArrayList<Channel>(1);
channels.add(channel);
@@ -90,19 +99,43 @@ public class TestHTTPSource {
source.setChannelProcessor(new ChannelProcessor(rcs));
channel.start();
+
+ // Channel for HTTPS source
+// List<Channel> sslChannels = new ArrayList<Channel>(1);
+// channels.add(httpsChannel);
+//
+// ChannelSelector sslRcs = new ReplicatingChannelSelector();
+// rcs.setChannels(sslChannels);
+
+ httpsSource.setChannelProcessor(new ChannelProcessor(rcs));
+// httpsChannel.start();
+
+ // HTTP context
Context context = new Context();
context.put("port", String.valueOf(selectedPort));
context.put("host", "0.0.0.0");
+ // SSL context props
+ Context sslContext = new Context();
+ sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
+ sslPort = findFreePort();
+ sslContext.put(HTTPSourceConfigurationConstants.SSL_PORT, String.valueOf(sslPort));
+ sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password");
+ sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore");
+
Configurables.configure(source, context);
+ Configurables.configure(httpsSource, sslContext);
source.start();
+ httpsSource.start();
}
@AfterClass
public static void tearDownClass() throws Exception {
source.stop();
channel.stop();
+ httpsSource.stop();
+// httpsChannel.stop();
}
@Before
@@ -268,6 +301,73 @@ public class TestHTTPSource {
return new ResultWrapper(resp, events);
}
+ @Test
+ public void testHttps() throws Exception {
+ Type listType = new TypeToken<List<JSONEvent>>() {
+ }.getType();
+ List<JSONEvent> events = Lists.newArrayList();
+ Random rand = new Random();
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> input = Maps.newHashMap();
+ for (int j = 0; j < 10; j++) {
+ input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i));
+ }
+ JSONEvent e = new JSONEvent();
+ e.setHeaders(input);
+ e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
+ events.add(e);
+ }
+ Gson gson = new Gson();
+ String json = gson.toJson(events, listType);
+ HttpsURLConnection httpsURLConnection = null;
+ try {
+ TrustManager[] trustAllCerts = {new X509TrustManager() {
+ @Override
+ public void checkClientTrusted(
+ java.security.cert.X509Certificate[] x509Certificates, String s)
+ throws CertificateException {
+ // noop
+ }
+
+ @Override
+ public void checkServerTrusted(
+ java.security.cert.X509Certificate[] x509Certificates, String s)
+ throws CertificateException {
+ // noop
+ }
+
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }};
+ SSLContext sc = SSLContext.getInstance("SSL");
+
+ HostnameVerifier hv = new HostnameVerifier() {
+ public boolean verify(String arg0, SSLSession arg1) {
+ return true;
+ }
+ };
+ sc.init(null, trustAllCerts, new SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ HttpsURLConnection.setDefaultHostnameVerifier(
+ SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+ URL sslUrl = new URL("https://0.0.0.0:" + sslPort);
+ httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection();
+ httpsURLConnection.setDoInput(true);
+ httpsURLConnection.setDoOutput(true);
+ httpsURLConnection.setRequestMethod("POST");
+ httpsURLConnection.getOutputStream().write(json.getBytes());
+
+ int statusCode = httpsURLConnection.getResponseCode();
+ Assert.assertEquals(200, statusCode);
+ } catch (Exception exception) {
+ Assert.fail("Exception not expected");
+ exception.printStackTrace();
+ } finally {
+ httpsURLConnection.disconnect();
+ }
+ }
+
private void takeWithEncoding(String encoding, int n, List<JSONEvent> events)
throws Exception{
Transaction tx = channel.getTransaction();
http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/test/resources/jettykeystore
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/resources/jettykeystore b/flume-ng-core/src/test/resources/jettykeystore
new file mode 100644
index 0000000..db76bcb
Binary files /dev/null and b/flume-ng-core/src/test/resources/jettykeystore differ
http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index fb42528..c614991 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1223,6 +1223,10 @@ selector.type replicating replicating or mul
selector.* Depends on the selector.type value
interceptors -- Space-separated list of interceptors
interceptors.*
+enableSSL false Set the property true, to enable SSL
+sslPort The port to be used for SSL
+keystore Location of the keystore includng keystore file name
+keystorePassword Keystore password
==================================================================================================================================
For example, a http source for agent named a1: