You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/08/02 08:02:57 UTC

git commit: FLUME-2109. HTTPS support in HTTP Source.

Updated Branches:
  refs/heads/trunk 5b5470bd5 -> e25661041


FLUME-2109. HTTPS support in HTTP Source.

(Ashish Paliwal via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e2566104
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e2566104
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e2566104

Branch: refs/heads/trunk
Commit: e25661041c2d478bf27d64d39241a9fce9a0d263
Parents: 5b5470b
Author: Hari Shreedharan <hs...@apache.org>
Authored: Thu Aug 1 22:46:38 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Thu Aug 1 22:46:38 2013 -0700

----------------------------------------------------------------------
 .../apache/flume/source/http/HTTPSource.java    |  56 +++++++++-
 .../http/HTTPSourceConfigurationConstants.java  |   5 +
 .../flume/source/http/TestHTTPSource.java       | 110 ++++++++++++++++++-
 flume-ng-core/src/test/resources/jettykeystore  | Bin 0 -> 1355 bytes
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   4 +
 5 files changed, 165 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index c90f067..84ee33b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -29,6 +29,7 @@ import org.apache.flume.source.AbstractSource;
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
 import org.mortbay.jetty.servlet.ServletHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,16 +89,46 @@ public class HTTPSource extends AbstractSource implements
   private HTTPSourceHandler handler;
   private SourceCounter sourceCounter;
 
+  // SSL configuration variable
+  private volatile Integer sslPort;
+  private volatile String keyStorePath;
+  private volatile String keyStorePassword;
+  private volatile Boolean sslEnabled;
+
+
   @Override
   public void configure(Context context) {
     try {
+      // SSL related config
+      sslEnabled = context.getBoolean(HTTPSourceConfigurationConstants.SSL_ENABLED, false);
+
       port = context.getInteger(HTTPSourceConfigurationConstants.CONFIG_PORT);
       host = context.getString(HTTPSourceConfigurationConstants.CONFIG_BIND,
         HTTPSourceConfigurationConstants.DEFAULT_BIND);
-      checkHostAndPort();
+
+      Preconditions.checkState(host != null && !host.isEmpty(),
+                "HTTPSource hostname specified is empty");
+      // verify port only if its not ssl
+      if(!sslEnabled) {
+        Preconditions.checkNotNull(port, "HTTPSource requires a port number to be"
+                + " specified");
+      }
+
       String handlerClassName = context.getString(
               HTTPSourceConfigurationConstants.CONFIG_HANDLER,
               HTTPSourceConfigurationConstants.DEFAULT_HANDLER).trim();
+
+      if(sslEnabled) {
+        LOG.debug("SSL configuration enabled");
+        sslPort = context.getInteger(HTTPSourceConfigurationConstants.SSL_PORT);
+        Preconditions.checkArgument(sslPort != null && sslPort > 0, "SSL Port cannot be null or less than 0" );
+        keyStorePath = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE);
+        Preconditions.checkArgument(keyStorePath != null && !keyStorePath.isEmpty(),
+                                        "Keystore is required for SSL Conifguration" );
+        keyStorePassword = context.getString(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD);
+        Preconditions.checkArgument(keyStorePassword != null, "Keystore password is required for SSL Configuration");
+      }
+
       @SuppressWarnings("unchecked")
       Class<? extends HTTPSourceHandler> clazz =
               (Class<? extends HTTPSourceHandler>)
@@ -139,10 +170,25 @@ public class HTTPSource extends AbstractSource implements
             + " before I started one."
             + "Will not attempt to start.");
     srv = new Server();
-    SocketConnector connector = new SocketConnector();
-    connector.setPort(port);
-    connector.setHost(host);
-    srv.setConnectors(new Connector[] { connector });
+
+    // Connector Array
+    Connector[] connectors = new Connector[1];
+
+
+    if(sslEnabled) {
+      SslSocketConnector sslSocketConnector = new SslSocketConnector();
+      sslSocketConnector.setKeystore(keyStorePath);
+      sslSocketConnector.setKeyPassword(keyStorePassword);
+      sslSocketConnector.setPort(sslPort);
+      connectors[0] = sslSocketConnector;
+    } else {
+        SocketConnector connector = new SocketConnector();
+        connector.setPort(port);
+        connector.setHost(host);
+        connectors[0] = connector;
+    }
+
+    srv.setConnectors(connectors);
     try {
       org.mortbay.jetty.servlet.Context root =
               new org.mortbay.jetty.servlet.Context(

http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
index f547e0f..205aeab 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSourceConfigurationConstants.java
@@ -34,4 +34,9 @@ public class HTTPSourceConfigurationConstants {
   public static final String DEFAULT_HANDLER =
           "org.apache.flume.source.http.JSONHandler";
 
+  public static final String SSL_PORT = "sslPort";
+  public static final String SSL_KEYSTORE = "keystore";
+  public static final String SSL_KEYSTORE_PASSWORD = "keystorePassword";
+  public static final String SSL_ENABLED = "enableSSL";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
index 8952db3..6c9fd86 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -22,11 +22,7 @@ import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import junit.framework.Assert;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
+import org.apache.flume.*;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
@@ -34,6 +30,7 @@ import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.JSONEvent;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.conn.ssl.SSLSocketFactory;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.junit.AfterClass;
@@ -41,10 +38,14 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import javax.net.ssl.*;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.ServerSocket;
+import java.net.URL;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -58,8 +59,12 @@ import static org.fest.reflect.core.Reflection.field;
 public class TestHTTPSource {
 
   private static HTTPSource source;
+  private static HTTPSource httpsSource;
+//  private static Channel httpsChannel;
+
   private static Channel channel;
   private static int selectedPort;
+  private static int sslPort;
   DefaultHttpClient httpClient;
   HttpPost postRequest;
 
@@ -77,9 +82,13 @@ public class TestHTTPSource {
     source = new HTTPSource();
     channel = new MemoryChannel();
 
+    httpsSource = new HTTPSource();
+//    httpsChannel = new MemoryChannel();
+
     Context ctx = new Context();
     ctx.put("capacity", "100");
     Configurables.configure(channel, ctx);
+//    Configurables.configure(httpsChannel, ctx);
 
     List<Channel> channels = new ArrayList<Channel>(1);
     channels.add(channel);
@@ -90,19 +99,43 @@ public class TestHTTPSource {
     source.setChannelProcessor(new ChannelProcessor(rcs));
 
     channel.start();
+
+    // Channel for HTTPS source
+//    List<Channel> sslChannels = new ArrayList<Channel>(1);
+//    channels.add(httpsChannel);
+//
+//    ChannelSelector sslRcs = new ReplicatingChannelSelector();
+//    rcs.setChannels(sslChannels);
+
+    httpsSource.setChannelProcessor(new ChannelProcessor(rcs));
+//    httpsChannel.start();
+
+    // HTTP context
     Context context = new Context();
 
     context.put("port", String.valueOf(selectedPort));
     context.put("host", "0.0.0.0");
 
+    // SSL context props
+    Context sslContext = new Context();
+    sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true");
+    sslPort = findFreePort();
+    sslContext.put(HTTPSourceConfigurationConstants.SSL_PORT, String.valueOf(sslPort));
+    sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password");
+    sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore");
+
     Configurables.configure(source, context);
+    Configurables.configure(httpsSource, sslContext);
     source.start();
+    httpsSource.start();
   }
 
   @AfterClass
   public static void tearDownClass() throws Exception {
     source.stop();
     channel.stop();
+    httpsSource.stop();
+//    httpsChannel.stop();
   }
 
   @Before
@@ -268,6 +301,73 @@ public class TestHTTPSource {
     return new ResultWrapper(resp, events);
   }
 
+  @Test
+  public void testHttps() throws Exception {
+    Type listType = new TypeToken<List<JSONEvent>>() {
+    }.getType();
+    List<JSONEvent> events = Lists.newArrayList();
+    Random rand = new Random();
+    for (int i = 0; i < 10; i++) {
+      Map<String, String> input = Maps.newHashMap();
+      for (int j = 0; j < 10; j++) {
+        input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i));
+      }
+      JSONEvent e = new JSONEvent();
+      e.setHeaders(input);
+      e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8"));
+      events.add(e);
+    }
+    Gson gson = new Gson();
+    String json = gson.toJson(events, listType);
+    HttpsURLConnection httpsURLConnection = null;
+    try {
+      TrustManager[] trustAllCerts = {new X509TrustManager() {
+        @Override
+        public void checkClientTrusted(
+          java.security.cert.X509Certificate[] x509Certificates, String s)
+          throws CertificateException {
+          // noop
+        }
+
+        @Override
+        public void checkServerTrusted(
+          java.security.cert.X509Certificate[] x509Certificates, String s)
+          throws CertificateException {
+          // noop
+        }
+
+        public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+          return null;
+        }
+      }};
+      SSLContext sc = SSLContext.getInstance("SSL");
+
+      HostnameVerifier hv = new HostnameVerifier() {
+        public boolean verify(String arg0, SSLSession arg1) {
+          return true;
+        }
+      };
+      sc.init(null, trustAllCerts, new SecureRandom());
+      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+      HttpsURLConnection.setDefaultHostnameVerifier(
+        SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+      URL sslUrl = new URL("https://0.0.0.0:" + sslPort);
+      httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection();
+      httpsURLConnection.setDoInput(true);
+      httpsURLConnection.setDoOutput(true);
+      httpsURLConnection.setRequestMethod("POST");
+      httpsURLConnection.getOutputStream().write(json.getBytes());
+
+      int statusCode = httpsURLConnection.getResponseCode();
+      Assert.assertEquals(200, statusCode);
+    } catch (Exception exception) {
+      Assert.fail("Exception not expected");
+      exception.printStackTrace();
+    } finally {
+      httpsURLConnection.disconnect();
+    }
+  }
+
   private void takeWithEncoding(String encoding, int n, List<JSONEvent> events)
           throws Exception{
     Transaction tx = channel.getTransaction();

http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-core/src/test/resources/jettykeystore
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/resources/jettykeystore b/flume-ng-core/src/test/resources/jettykeystore
new file mode 100644
index 0000000..db76bcb
Binary files /dev/null and b/flume-ng-core/src/test/resources/jettykeystore differ

http://git-wip-us.apache.org/repos/asf/flume/blob/e2566104/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index fb42528..c614991 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1223,6 +1223,10 @@ selector.type   replicating                                   replicating or mul
 selector.*                                                    Depends on the selector.type value
 interceptors    --                                            Space-separated list of interceptors
 interceptors.*
+enableSSL       false                                         Set the property true, to enable SSL
+sslPort                                                       The port to be used for SSL
+keystore                                                      Location of the keystore includng keystore file name
+keystorePassword                                              Keystore password
 ==================================================================================================================================
 
 For example, a http source for agent named a1: