You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/04/25 23:43:22 UTC
incubator-metron git commit: METRON-883 Capture Bro Plugin
Enhancements from bro/bro-plugins (nickwallen) closes
apache/incubator-metron#545
Repository: incubator-metron
Updated Branches:
refs/heads/master fbce3b5f9 -> 19e0e715d
METRON-883 Capture Bro Plugin Enhancements from bro/bro-plugins (nickwallen) closes apache/incubator-metron#545
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/19e0e715
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/19e0e715
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/19e0e715
Branch: refs/heads/master
Commit: 19e0e715d30670612b0ad826ed133ba138bdc20f
Parents: fbce3b5
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Apr 25 19:42:28 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Apr 25 19:42:28 2017 -0400
----------------------------------------------------------------------
metron-sensors/bro-plugin-kafka/README.md | 106 +++++++++++++------
.../bro-plugin-kafka/cmake/FindLibRDKafka.cmake | 30 +++---
.../bro-plugin-kafka/cmake/FindOpenSSL.cmake | 2 +
.../scripts/Bro/Kafka/__load__.bro | 2 +
.../scripts/Bro/Kafka/logs-to-kafka.bro | 3 +-
.../bro-plugin-kafka/src/KafkaWriter.cc | 85 +++++++++------
.../bro-plugin-kafka/src/KafkaWriter.h | 15 +++
7 files changed, 163 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md
index 6d0582e..31b1f54 100644
--- a/metron-sensors/bro-plugin-kafka/README.md
+++ b/metron-sensors/bro-plugin-kafka/README.md
@@ -1,60 +1,98 @@
-Bro Logging Output to Kafka
+Logging Bro Output to Kafka
===========================
-A Bro log writer that sends logging output to Kafka. This provides a convenient
-means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to
-process the data generated by Bro.
+A Bro log writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro.
Installation
------------
-Install librdkafka (https://github.com/edenhill/librdkafka), a native client
-library for Kafka. This plugin has been tested against the latest release of
-librdkafka, which at the time of this writing is v0.9.4. In order to support interacting
-with a kerberized kafka, you will need libsasl2 installed
+1. Install [librdkafka](https://github.com/edenhill/librdkafka), a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4.
-```
-# curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz
-# cd librdkafka-0.9.4/
-# ./configure --enable-sasl
-# make
-# sudo make install
-```
+ In order to use this plugin within a kerberized Kafka environment, you will also need `libsasl2` installed and will need to pass `--enable-sasl` to the `configure` script.
-Then compile this Bro plugin using the following commands.
+ ```
+ curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz
+ cd librdkafka-0.9.4/
+ ./configure --enable-sasl
+ make
+ sudo make install
+ ```
-```
-# ./configure --bro-dist=$BRO_SRC
-# make
-# sudo make install
-```
+1. Build the plugin using the following commands.
-Run the following command to ensure that the plugin was installed successfully.
+ ```
+ ./configure --bro-dist=$BRO_SRC
+ make
+ sudo make install
+ ```
-```
-# bro -N Bro::Kafka
-Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
-```
+1. Run the following command to ensure that the plugin was installed successfully.
+
+ ```
+ $ bro -N Bro::Kafka
+ Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
+ ```
Activation
----------
-The easiest way to enable Kafka output is to load the plugin's
-`logs-to-kafka.bro` script. If you are using BroControl, the following lines
-added to local.bro will activate it.
+The following examples highlight different ways that the plugin can be used. Simply add Bro script to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to activate the plugin.
+
+### Example 1
+
+The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`.
+ * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table.
+ * By defining `topic_name` all records will be sent to the same Kafka topic.
```
@load Bro/Kafka/logs-to-kafka.bro
-redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
+redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
redef Kafka::topic_name = "bro";
redef Kafka::kafka_conf = table(
["metadata.broker.list"] = "localhost:9092"
);
```
-This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on
-the localhost to a topic called `bro`. Any configuration value accepted by
-librdkafka can be added to the `kafka_conf` configuration table.
+### Example 2
+
+It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`.
+ * The `topic_name` value must be set to an empty string.
+ * The `$path` value of Bro's Log Writer mechanism is used to define the topic name.
+ * Any configuration value accepted by librdkafka can be added to the `$config` configuration table.
+ * Each log writer accepts a separate configuration table.
+
+```
+@load Bro/Kafka/logs-to-kafka.bro
+redef Kafka::topic_name = "";
+redef Kafka::tag_json = T;
+
+event bro_init()
+{
+ # handles HTTP
+ local http_filter: Log::Filter = [
+ $name = "kafka-http",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $config = table(
+ ["stream_id"] = "HTTP::LOG",
+ ["metadata.broker.list"] = "localhost:9092"
+ ),
+ $path = "http"
+ ];
+ Log::add_filter(HTTP::LOG, http_filter);
+
+ # handles DNS
+ local dns_filter: Log::Filter = [
+ $name = "kafka-dns",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $config = table(
+ ["stream_id"] = "DNS::LOG",
+ ["metadata.broker.list"] = "localhost:9092"
+ ),
+ $path = "dns"
+ ];
+ Log::add_filter(DNS::LOG, dns_filter);
+}
+```
Settings
--------
@@ -147,7 +185,7 @@ For an environment where the following is true:
The kafka topic `bro` has been given permission for the `metron` user to
write:
```
-# login using the metron user
+# login using the metron user
kinit -kt /etc/security/keytabs/metron.headless.keytab metron@EXAMPLE.COM
${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro
```
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
index c64d8f9..904bfff 100644
--- a/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
+++ b/metron-sensors/bro-plugin-kafka/cmake/FindLibRDKafka.cmake
@@ -16,34 +16,36 @@
#
find_path(LibRDKafka_ROOT_DIR
- NAMES include/librdkafka/rdkafkacpp.h
+ NAMES include/librdkafka/rdkafkacpp.h
)
find_library(LibRDKafka_LIBRARIES
- NAMES rdkafka++
- HINTS ${LibRDKafka_ROOT_DIR}/lib
+ NAMES rdkafka++
+ HINTS ${LibRDKafka_ROOT_DIR}/lib
+ PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)
find_library(LibRDKafka_C_LIBRARIES
- NAMES rdkafka
- HINTS ${LibRDKafka_ROT_DIR}/lib
+ NAMES rdkafka
+ HINTS ${LibRDKafka_ROT_DIR}/lib
+ PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)
find_path(LibRDKafka_INCLUDE_DIR
- NAMES librdkafka/rdkafkacpp.h
- HINTS ${LibRDKafka_ROOT_DIR}/include
+ NAMES librdkafka/rdkafkacpp.h
+ HINTS ${LibRDKafka_ROOT_DIR}/include
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(LibRDKafka DEFAULT_MSG
- LibRDKafka_LIBRARIES
- LibRDKafka_C_LIBRARIES
- LibRDKafka_INCLUDE_DIR
+ LibRDKafka_LIBRARIES
+ LibRDKafka_C_LIBRARIES
+ LibRDKafka_INCLUDE_DIR
)
mark_as_advanced(
- LibRDKafka_ROOT_DIR
- LibRDKafka_LIBRARIES
- LibRDKafka_C_LIBRARIES
- LibRDKafka_INCLUDE_DIR
+ LibRDKafka_ROOT_DIR
+ LibRDKafka_LIBRARIES
+ LibRDKafka_C_LIBRARIES
+ LibRDKafka_INCLUDE_DIR
)
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
index 5ed955c..58af5c7 100644
--- a/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
+++ b/metron-sensors/bro-plugin-kafka/cmake/FindOpenSSL.cmake
@@ -47,11 +47,13 @@ find_path(OpenSSL_INCLUDE_DIR
find_library(OpenSSL_SSL_LIBRARY
NAMES ssl ssleay32 ssleay32MD
HINTS ${OpenSSL_ROOT_DIR}/lib
+ PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)
find_library(OpenSSL_CRYPTO_LIBRARY
NAMES crypto
HINTS ${OpenSSL_ROOT_DIR}/lib
+ PATH_SUFFIXES ${CMAKE_LIBRARY_ARCHITECTURE}
)
set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
index 12295a9..1df1136 100644
--- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
+++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
@@ -17,3 +17,5 @@
# This is loaded when a user activates the plugin. Include scripts here that should be
# loaded automatically at that point.
#
+
+@load ./init.bro
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
index 84e390c..d62e03f 100644
--- a/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
+++ b/metron-sensors/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
@@ -35,7 +35,8 @@ event bro_init() &priority=-5
{
local filter: Log::Filter = [
$name = fmt("kafka-%s", stream_id),
- $writer = Log::WRITER_KAFKAWRITER
+ $writer = Log::WRITER_KAFKAWRITER,
+ $config = table(["stream_id"] = fmt("%s", stream_id))
];
Log::add_filter(stream_id, filter);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
index 79a85ed..951a60c 100644
--- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
+++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
@@ -22,9 +22,35 @@ using namespace writer;
KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL)
{
- // TODO do we need this??
- topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(),
- BifConst::Kafka::topic_name->Len());
+ // need thread-local copies of all user-defined settings coming from
+ // bro scripting land. accessing these is not thread-safe and 'DoInit'
+ // is potentially accessed from multiple threads.
+
+ // tag_json - thread local copy
+ tag_json = BifConst::Kafka::tag_json;
+
+ // topic name - thread local copy
+ topic_name.assign(
+ (const char*)BifConst::Kafka::topic_name->Bytes(),
+ BifConst::Kafka::topic_name->Len());
+
+ // kafka_conf - thread local copy
+ Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
+ IterCookie* c = val->AsTable()->InitForIteration();
+ HashKey* k;
+ TableEntryVal* v;
+ while ((v = val->AsTable()->NextEntry(k, c))) {
+
+ // fetch the key and value
+ ListVal* index = val->AsTableVal()->RecoverIndex(k);
+ string key = index->Index(0)->AsString()->CheckString();
+ string val = v->Value()->AsString()->CheckString();
+ kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val));
+
+ // cleanup
+ Unref(index);
+ delete k;
+ }
}
KafkaWriter::~KafkaWriter()
@@ -32,6 +58,11 @@ KafkaWriter::~KafkaWriter()
bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
{
+ // if no global 'topic_name' is defined, use the log stream's 'path'
+ if(topic_name.empty()) {
+ topic_name = info.path;
+ }
+
// initialize the formatter
if(BifConst::Kafka::tag_json) {
formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
@@ -39,8 +70,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
}
- // kafka global configuration
- string err;
+ // is debug enabled
string debug;
debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len());
bool is_debug(!debug.empty());
@@ -53,41 +83,31 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
else {
reporter->Info( "Debug is turned off.");
}
+
+ // kafka global configuration
+ string err;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
// apply the user-defined settings to kafka
- Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
- IterCookie* c = val->AsTable()->InitForIteration();
- HashKey* k;
- TableEntryVal* v;
- while ((v = val->AsTable()->NextEntry(k, c))) {
-
- // fetch the key and value
- ListVal* index = val->AsTableVal()->RecoverIndex(k);
- string key = index->Index(0)->AsString()->CheckString();
- string val = v->Value()->AsString()->CheckString();
-
- if(is_debug) {
- reporter->Info("Setting '%s'='%s'", key.c_str(), val.c_str());
- }
- // apply setting to kafka
- if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
- reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
- return false;
- }
-
- // cleanup
- Unref(index);
- delete k;
+ map<string,string>::iterator i;
+ for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
+ string key = i->first;
+ string val = i->second;
+
+ // apply setting to kafka
+ if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+ reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
+ return false;
+ }
}
if(is_debug) {
string key("debug");
string val(debug);
- if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+ if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
return false;
- }
+ }
}
// create kafka producer
@@ -104,9 +124,11 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
reporter->Error("Failed to create topic handle: %s", err.c_str());
return false;
}
+
if(is_debug) {
reporter->Info("Successfully created producer.");
}
+
return true;
}
@@ -130,8 +152,9 @@ bool KafkaWriter::DoFinish(double network_time)
// successful only if all messages delivered
if (producer->outq_len() == 0) {
- reporter->Error("Unable to deliver %0d message(s)", producer->outq_len());
success = true;
+ } else {
+ reporter->Error("Unable to deliver %0d message(s)", producer->outq_len());
}
delete topic;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/19e0e715/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
index 7e77bc0..ad3e03f 100644
--- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
+++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.h
@@ -28,6 +28,17 @@
#include "kafka.bif.h"
#include "TaggedJSON.h"
+namespace RdKafka {
+ class Conf;
+ class Producer;
+ class Topic;
+}
+
+namespace threading {
+ namespace formatter {
+ class Formatter;
+}}
+
namespace logging { namespace writer {
/**
@@ -54,6 +65,10 @@ protected:
virtual bool DoHeartbeat(double network_time, double current_time);
private:
+ static const string default_topic_key;
+ string stream_id;
+ bool tag_json;
+ map<string, string> kafka_conf;
string topic_name;
threading::formatter::Formatter *formatter;
RdKafka::Producer* producer;