You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by jo...@apache.org on 2017/06/01 15:29:42 UTC

metron git commit: METRON-858 bro-plugin-kafka is throwing segfaults (JonZeolla) closes apache/metron#547

Repository: metron
Updated Branches:
  refs/heads/master 8779eb3fe -> 85872bd68


METRON-858 bro-plugin-kafka is throwing segfaults (JonZeolla) closes apache/metron#547


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/85872bd6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/85872bd6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/85872bd6

Branch: refs/heads/master
Commit: 85872bd68698149692c97a48dfe41a78435dcc99
Parents: 8779eb3
Author: JonZeolla <ze...@gmail.com>
Authored: Thu Jun 1 11:28:42 2017 -0400
Committer: jonzeolla <jo...@apache.org>
Committed: Thu Jun 1 11:28:42 2017 -0400

----------------------------------------------------------------------
 metron-sensors/bro-plugin-kafka/README.md       | 56 ++++++++++++++++++--
 .../bro-plugin-kafka/configure.plugin           |  2 +-
 .../bro-plugin-kafka/src/KafkaWriter.cc         | 21 ++++----
 3 files changed, 63 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/README.md b/metron-sensors/bro-plugin-kafka/README.md
index 31b1f54..e219360 100644
--- a/metron-sensors/bro-plugin-kafka/README.md
+++ b/metron-sensors/bro-plugin-kafka/README.md
@@ -36,13 +36,14 @@ Installation
 Activation
 ----------
 
-The following examples highlight different ways that the plugin can be used.  Simply add Bro script to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to activate the plugin.
+The following examples highlight different ways that the plugin can be used.  Simply add the Bro script language to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to demonstrate the example.
 
 ### Example 1
 
 The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. 
  * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table.  
  * By defining `topic_name` all records will be sent to the same Kafka topic.
+ * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent.
 
 ```
 @load Bro/Kafka/logs-to-kafka.bro
@@ -73,7 +74,6 @@ event bro_init()
         $name = "kafka-http",
         $writer = Log::WRITER_KAFKAWRITER,
         $config = table(
-                ["stream_id"] = "HTTP::LOG",
                 ["metadata.broker.list"] = "localhost:9092"
         ),
         $path = "http"
@@ -85,7 +85,6 @@ event bro_init()
         $name = "kafka-dns",
         $writer = Log::WRITER_KAFKAWRITER,
         $config = table(
-                ["stream_id"] = "DNS::LOG",
                 ["metadata.broker.list"] = "localhost:9092"
         ),
         $path = "dns"
@@ -94,6 +93,57 @@ event bro_init()
 }
 ```
 
+### Example 3
+
+You may want to configure bro to filter log messages with certain characteristics from being sent to your kafka topics.  For instance, Metron currently doesn't support IPv6 source or destination IPs in the default enrichments, so it may be helpful to filter those log messages from being sent to kafka (although there are [multiple ways](#notes) to approach this).  In this example we will do that that, and are assuming a somewhat standard bro kafka plugin configuration, such that:
+ * All bro logs are sent to the `bro` topic, by configuring `Kafka::topic_name`.
+ * Each JSON message is tagged with the appropriate log type (such as `http`, `dns`, or `conn`), by setting `tag_json` to true.
+ * If the log message contains a 128 byte long source or destination IP address, the log is not sent to kafka.
+
+```
+@load Bro/Kafka/logs-to-kafka.bro
+redef Kafka::topic_name = "bro";
+redef Kafka::tag_json = T;
+
+event bro_init() &priority=-5
+{
+    # handles HTTP
+    Log::add_filter(HTTP::LOG, [
+        $name = "kafka-http",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $pred(rec: HTTP::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); },
+        $config = table(
+            ["metadata.broker.list"] = "localhost:9092"
+        )
+    ]);
+
+    # handles DNS
+    Log::add_filter(DNS::LOG, [
+        $name = "kafka-dns",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $pred(rec: DNS::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); },
+        $config = table(
+            ["metadata.broker.list"] = "localhost:9092"
+        )
+    ]);
+
+    # handles Conn
+    Log::add_filter(Conn::LOG, [
+        $name = "kafka-conn",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $pred(rec: Conn::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); },
+        $config = table(
+            ["metadata.broker.list"] = "localhost:9092"
+        )
+    ]);
+}
+```
+
+#### Notes
+ * `logs_to_send` is mutually exclusive with `$pred`, thus for each log you want to set `$pred` on, you must individually setup a `Log::add_filter` and refrain from including that log in `logs_to_send`.
+ * You can also filter IPv6 logs from within your Metron cluster [using Stellar](../../metron-platform/metron-common#IS_IP).  In that case, you wouldn't apply a predicate in your bro configuration, and instead Stellar would filter the logs out before they were processed by the enrichment layer of Metron.
+ * It is also possible to use the `is_v6_subnet()` bro function in your predicate, as of their [2.5 release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), however the above example should work on [bro 2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and newer, which has been the focus of the kafka plugin.
+
 Settings
 --------
 

http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/configure.plugin
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/configure.plugin b/metron-sensors/bro-plugin-kafka/configure.plugin
index 1cb2086..c7e6662 100644
--- a/metron-sensors/bro-plugin-kafka/configure.plugin
+++ b/metron-sensors/bro-plugin-kafka/configure.plugin
@@ -31,7 +31,7 @@ plugin_option()
 {
   case "$1" in
     --with-librdkafka=*)
-      append_cache_entry LibRdKafka_ROOT_DIR PATH $optarg
+      append_cache_entry LibRDKafka_ROOT_DIR PATH $optarg
       ;;
     --with-openssl=*)
       append_cache_entry OpenSSL_ROOT_DIR PATH $optarg

http://git-wip-us.apache.org/repos/asf/metron/blob/85872bd6/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
----------------------------------------------------------------------
diff --git a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
index 951a60c..c9ad44f 100644
--- a/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
+++ b/metron-sensors/bro-plugin-kafka/src/KafkaWriter.cc
@@ -75,13 +75,10 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
     debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len());
     bool is_debug(!debug.empty());
     if(is_debug) {
-      reporter->Info( "Debug is turned on and set to: %s.  Available debug context: %s."
-                     , debug.c_str()
-                     , RdKafka::get_debug_contexts().c_str()
-                     );
+      MsgThread::Info(Fmt("Debug is turned on and set to: %s.  Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
     }
     else {
-      reporter->Info( "Debug is turned off.");
+      MsgThread::Info(Fmt("Debug is turned off."));
     }
 
     // kafka global configuration
@@ -96,7 +93,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
 
       // apply setting to kafka
       if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-          reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
+          Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()));
           return false;
       }
     }
@@ -105,7 +102,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
         string key("debug");
         string val(debug);
         if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
-            reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
+            Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()));
             return false;
         }
     }
@@ -113,7 +110,7 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
     // create kafka producer
     producer = RdKafka::Producer::create(conf, err);
     if (!producer) {
-        reporter->Error("Failed to create producer: %s", err.c_str());
+        Error(Fmt("Failed to create producer: %s", err.c_str()));
         return false;
     }
 
@@ -121,12 +118,12 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading
     topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
     topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
     if (!topic) {
-        reporter->Error("Failed to create topic handle: %s", err.c_str());
+        Error(Fmt("Failed to create topic handle: %s", err.c_str()));
         return false;
     }
 
     if(is_debug) {
-        reporter->Info("Successfully created producer.");
+        MsgThread::Info(Fmt("Successfully created producer."));
     }
 
     return true;
@@ -154,7 +151,7 @@ bool KafkaWriter::DoFinish(double network_time)
     if (producer->outq_len() == 0) {
         success = true;
     } else {
-        reporter->Error("Unable to deliver %0d message(s)", producer->outq_len());
+        Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
     }
 
     delete topic;
@@ -187,7 +184,7 @@ bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields,
     }
     else {
         string err = RdKafka::err2str(resp);
-        reporter->Error("Kafka send failed: %s", err.c_str());
+        Error(Fmt("Kafka send failed: %s", err.c_str()));
     }
 
     return true;