You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:06:15 UTC
[33/43] incubator-metron git commit: METRON-64 Fixed
'dns/Log::WRITER_KAFKAWRITER' Error (nickwallen via cestella) closes
apache/incubator-metron#43
METRON-64 Fixed 'dns/Log::WRITER_KAFKAWRITER' Error (nickwallen via cestella) closes apache/incubator-metron#43
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/75666533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/75666533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/75666533
Branch: refs/heads/Metron_0.1BETA
Commit: 75666533c74befd2de4c25797a3b8b3b136c026a
Parents: f32af01
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Mar 14 10:15:19 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Mar 14 10:15:19 2016 -0400
----------------------------------------------------------------------
bro-plugin-kafka/CHANGES | 25 +++--
bro-plugin-kafka/CMakeLists.txt | 42 +++++----
bro-plugin-kafka/MAINTAINER | 18 ++++
bro-plugin-kafka/Makefile | 26 +++---
bro-plugin-kafka/README | 96 +++++++++++++++++++
bro-plugin-kafka/README.md | 88 ------------------
bro-plugin-kafka/VERSION | 17 ++++
bro-plugin-kafka/cmake/FindOpenSSL.cmake | 72 +++++++++++++++
bro-plugin-kafka/configure | 31 +++----
bro-plugin-kafka/configure.plugin | 8 +-
bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro | 19 ++++
.../scripts/Bro/Kafka/logs-to-kafka.bro | 44 +++++++++
.../scripts/Metron/Kafka/__load__.bro | 21 -----
.../scripts/Metron/Kafka/logs-to-kafka.bro | 48 ----------
bro-plugin-kafka/scripts/__load__.bro | 2 -
bro-plugin-kafka/scripts/init.bro | 19 +---
bro-plugin-kafka/src/KafkaWriter.cc | 97 ++++++++++----------
bro-plugin-kafka/src/KafkaWriter.h | 84 ++++++++---------
bro-plugin-kafka/src/MetronJSON.cc | 50 ----------
bro-plugin-kafka/src/MetronJSON.h | 54 -----------
bro-plugin-kafka/src/Plugin.cc | 15 ++-
bro-plugin-kafka/src/Plugin.h | 12 +--
bro-plugin-kafka/src/TaggedJSON.cc | 43 +++++++++
bro-plugin-kafka/src/TaggedJSON.h | 50 ++++++++++
bro-plugin-kafka/src/kafka.bif | 5 +-
bro-plugin-kafka/src/kafka_const.bif | 20 ++++
bro-plugin-kafka/tests/Scripts/get-bro-env | 4 +-
bro-plugin-kafka/tests/kafka/show-plugin.bro | 2 +-
deployment/amazon-ec2/.gitignore | 1 +
deployment/amazon-ec2/README.md | 14 ++-
deployment/amazon-ec2/ansible.cfg | 1 +
deployment/amazon-ec2/playbook.yml | 4 +
deployment/playbooks/ambari_install.yml | 5 +-
deployment/playbooks/metron_install.yml | 7 +-
deployment/roles/bro/tasks/bro-plugin-kafka.yml | 10 +-
deployment/roles/bro/tasks/librdkafka.yml | 11 ++-
deployment/roles/bro/vars/main.yml | 2 +
pom.xml | 44 ++++-----
38 files changed, 617 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/CHANGES
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/CHANGES b/bro-plugin-kafka/CHANGES
index 3df60b0..d9e26de 100644
--- a/bro-plugin-kafka/CHANGES
+++ b/bro-plugin-kafka/CHANGES
@@ -1,17 +1,16 @@
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/CMakeLists.txt b/bro-plugin-kafka/CMakeLists.txt
index f578eb2..30bf3b5 100644
--- a/bro-plugin-kafka/CMakeLists.txt
+++ b/bro-plugin-kafka/CMakeLists.txt
@@ -1,39 +1,43 @@
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
cmake_minimum_required(VERSION 2.8)
project(Plugin)
include(BroPlugin)
find_package(LibRDKafka)
+find_package(OpenSSL)
-if ( LIBRDKAFKA_FOUND )
- include_directories(BEFORE ${LibRDKafka_INCLUDE_DIR})
-
- bro_plugin_begin(METRON KAFKA)
+if (LIBRDKAFKA_FOUND AND OPENSSL_FOUND)
+ include_directories(BEFORE ${LibRDKafka_INCLUDE_DIR} ${OpenSSL_INCLUDE_DIR})
+ bro_plugin_begin(BRO KAFKA)
bro_plugin_cc(src/KafkaWriter.cc)
- bro_plugin_cc(src/MetronJSON.cc)
bro_plugin_cc(src/Plugin.cc)
+ bro_plugin_cc(src/TaggedJSON.cc)
bro_plugin_bif(src/kafka.bif)
bro_plugin_dist_files(README CHANGES COPYING VERSION)
bro_plugin_link_library(${LibRDKafka_LIBRARIES})
bro_plugin_link_library(${LibRDKafka_C_LIBRARIES})
+ bro_plugin_link_library(${OpenSSL_LIBRARIES})
bro_plugin_end()
-else ()
+elseif (NOT LIBRDKAFKA_FOUND)
message(FATAL_ERROR "LibRDKafka not found.")
+
+elseif (NOT OPENSSL_FOUND)
+ message(FATAL_ERROR "OpenSSL not found.")
+
endif ()
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/MAINTAINER
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/MAINTAINER b/bro-plugin-kafka/MAINTAINER
new file mode 100644
index 0000000..0ddaed7
--- /dev/null
+++ b/bro-plugin-kafka/MAINTAINER
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+Apache Metron <us...@metron.incubator.apache.org>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/Makefile
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/Makefile b/bro-plugin-kafka/Makefile
index e4157c9..50fa3ca 100644
--- a/bro-plugin-kafka/Makefile
+++ b/bro-plugin-kafka/Makefile
@@ -1,20 +1,18 @@
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
# Convenience Makefile providing a few common top-level targets.
#
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/README
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/README b/bro-plugin-kafka/README
new file mode 100644
index 0000000..c672bd0
--- /dev/null
+++ b/bro-plugin-kafka/README
@@ -0,0 +1,96 @@
+
+===============================
+Writing Logging Output to Kafka
+===============================
+
+A log writer that sends logging output to Kafka. This provides a convenient
+means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to
+process the data generated by Bro.
+
+.. contents::
+
+Installation
+------------
+
+Install librdkafka (https://github.com/edenhill/librdkafka), a native client
+library for Kafka. This plugin has been tested against the latest release of
+librdkafka, which at the time of this writing is v0.8.6.
+
+ # curl -L https://github.com/edenhill/librdkafka/archive/0.8.6.tar.gz | tar xvz
+ # cd librdkafka-0.8.6/
+ # ./configure
+ # make
+ # sudo make install
+
+Then compile this Bro plugin using the following commands.
+
+ # ./configure --bro-dist=$BRO_SRC
+ # make
+ # sudo make install
+
+Run the following command to ensure that the plugin was installed successfully.
+
+ # bro -N Bro::Kafka
+ Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
+
+Activation
+----------
+
+The easiest way to enable Kafka output is to load the plugin's
+``logs-to-kafka.bro`` script. If you are using BroControl, the following lines
+added to local.bro will activate it.
+
+.. console::
+
+ @load Bro/Kafka/logs-to-kafka.bro
+ redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
+ redef Kafka::topic_name = "bro";
+ redef Kafka::kafka_conf = table(
+ ["metadata.broker.list"] = "localhost:9092"
+ );
+
+This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on
+the localhost to a topic called ``bro``. Any configuration value accepted by
+librdkafka can be added to the ``kafka_conf`` configuration table.
+
+Settings
+--------
+
+``kafka_conf``
+
+The global configuration settings for Kafka. These values are passed through
+directly to librdkafka. Any valid librdkafka settings can be defined in this
+table.
+
+.. console::
+
+ redef Kafka::kafka_conf = table(
+ ["metadata.broker.list"] = "localhost:9092",
+ ["client.id"] = "bro"
+ );
+
+``topic_name``
+
+The name of the topic in Kafka where all Bro logs will be sent to.
+
+.. console::
+
+ redef Kafka::topic_name = "bro";
+
+``max_wait_on_shutdown``
+
+The maximum number of milliseconds that the plugin will wait for any backlog of
+queued messages to be sent to Kafka before forced shutdown.
+
+.. console::
+
+ redef Kafka::max_wait_on_shutdown = 3000;
+
+``tag_json``
+
+If true, a log stream identifier is appended to each JSON-formatted message. For
+example, a Conn::LOG message will look like ``{ 'conn' : { ... }}``.
+
+.. console::
+
+ redef Kafka::tag_json = T;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/README.md
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/README.md b/bro-plugin-kafka/README.md
deleted file mode 100644
index 6e965c5..0000000
--- a/bro-plugin-kafka/README.md
+++ /dev/null
@@ -1,88 +0,0 @@
-
-[Bro](https://www.bro.org/) Logging with Kafka
-==============================================
-
-A Bro Log filter that sends log data to [Kafka](http://kafka.apache.org/). This
-provides a convenient means for tools in the Hadoop ecosystem, such as [Storm](http://storm.apache.org/),
-[Spark](http://spark.apache.org/), and others, to process the log data generated by Bro.
-
-Installation
-------------
-
-Install [librdkafka](https://github.com/edenhill/librdkafka), a native client
-library for Kafka.
-
-Download and compile Bro from source code. Replace version 2.4.1 with the
-most recent stable release. See the INSTALL documentation within the Bro release
-for more details on prerequisites and compilation.
-
-```
-curl https://www.bro.org/downloads/release/bro-2.4.1.tar.gz | tar -xvz
-cd bro-2.4.1
-./configure
-make
-sudo make install
-export BRO_SRC=`pwd`
-```
-
-Download and compile this Kafka plugin for Bro.
-
-```
-./configure --bro-dist=$BRO_SRC
-make
-sudo make install
-```
-
-Optional: Instead of running `make install` as above, after the plugin has been
-compiled, the `build` directory can be added to the `BRO_PLUGIN_PATH`. Bro will
-automatically load any plugins that it finds at this location.
-
-```
-export BRO_PLUGIN_PATH="/home/the-dude/bro-plugin-kafka/build/"
-```
-
-Run the following command to ensure that the plugin was installed successfully.
-The 'bro' executable must be in your PATH.
-
-```
-bro -N Metron::Kafka
-```
-
-Getting Started
----------------
-
-After installing the plugin, create a bro script that will activate it. In most cases, this can be placed in the site/local file. In most cases this file is located at `$BRO_HOME/share/bro/site/local.bro`.
-
-This following example Bro script will direct all HTTP, DNS, and Conn logs to a Kafka broker running locally to a topic called `bro`.
-
-```
-@load Metron/Kafka/logs-to-kafka.bro
-redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
-redef Kafka::kafka_broker_list = "localhost:9092";
-redef Kafka::topic_name = "bro";
-```
-
-Next, simply start Bro. If you are starting Bro with `broctl` after updating `local.bro` you will need to run `install` and then `start`.
-
-```
-$ /usr/local/bro/bin/broctl
-
-Welcome to BroControl 1.4
-
-Type "help" for help.
-
-[BroControl] > install
-removing old policies in /usr/local/bro/spool/installed-scripts-do-not-touch/site ...
-removing old policies in /usr/local/bro/spool/installed-scripts-do-not-touch/auto ...
-creating policy directories ...
-installing site policies ...
-generating standalone-layout.bro ...
-generating local-networks.bro ...
-generating broctl-config.bro ...
-generating broctl-config.sh ...
-updating nodes ...
-[BroControl] > start
-starting bro ...
-```
-
-The plugin does not interfere with the existing file-based logging. To validate proper functioning compare the data being received at Kafka to the data in the local log file.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/VERSION
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/VERSION b/bro-plugin-kafka/VERSION
index 49d5957..204e5ca 100644
--- a/bro-plugin-kafka/VERSION
+++ b/bro-plugin-kafka/VERSION
@@ -1 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
0.1
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/cmake/FindOpenSSL.cmake
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/cmake/FindOpenSSL.cmake b/bro-plugin-kafka/cmake/FindOpenSSL.cmake
new file mode 100644
index 0000000..5ed955c
--- /dev/null
+++ b/bro-plugin-kafka/cmake/FindOpenSSL.cmake
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# - Try to find openssl include dirs and libraries
+#
+# Usage of this module as follows:
+#
+# find_package(OpenSSL)
+#
+# Variables used by this module, they can change the default behaviour and need
+# to be set before calling find_package:
+#
+# OpenSSL_ROOT_DIR Set this variable to the root installation of
+# openssl if the module has problems finding the
+# proper installation path.
+#
+# Variables defined by this module:
+#
+# OPENSSL_FOUND System has openssl, include and library dirs found
+# OpenSSL_INCLUDE_DIR The openssl include directories.
+# OpenSSL_LIBRARIES The openssl libraries.
+# OpenSSL_CYRPTO_LIBRARY The openssl crypto library.
+# OpenSSL_SSL_LIBRARY The openssl ssl library.
+
+find_path(OpenSSL_ROOT_DIR
+ NAMES include/openssl/ssl.h
+)
+
+find_path(OpenSSL_INCLUDE_DIR
+ NAMES openssl/ssl.h
+ HINTS ${OpenSSL_ROOT_DIR}/include
+)
+
+find_library(OpenSSL_SSL_LIBRARY
+ NAMES ssl ssleay32 ssleay32MD
+ HINTS ${OpenSSL_ROOT_DIR}/lib
+)
+
+find_library(OpenSSL_CRYPTO_LIBRARY
+ NAMES crypto
+ HINTS ${OpenSSL_ROOT_DIR}/lib
+)
+
+set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY}
+ CACHE STRING "OpenSSL SSL and crypto libraries" FORCE)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(OpenSSL DEFAULT_MSG
+ OpenSSL_LIBRARIES
+ OpenSSL_INCLUDE_DIR
+)
+
+mark_as_advanced(
+ OpenSSL_ROOT_DIR
+ OpenSSL_INCLUDE_DIR
+ OpenSSL_LIBRARIES
+ OpenSSL_CRYPTO_LIBRARY
+ OpenSSL_SSL_LIBRARY
+)
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/configure
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/configure b/bro-plugin-kafka/configure
index b496886..d053488 100755
--- a/bro-plugin-kafka/configure
+++ b/bro-plugin-kafka/configure
@@ -1,21 +1,20 @@
#!/bin/sh
#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+# http://www.apache.org/licenses/LICENSE-2.0
#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
#
# Wrapper for viewing/setting options that the plugin's CMake
# scripts will recognize.
@@ -84,15 +83,15 @@ while [ $# -ne 0 ]; do
--help|-h)
usage
;;
-
--bro-dist=*)
brodist=`cd $optarg && pwd`
;;
-
--install-root=*)
installroot=$optarg
;;
-
+ --with-openssl=*)
+ append_cache_entry OpenSSL_ROOT_DIR PATH $optarg
+ ;;
*)
if type plugin_option >/dev/null 2>&1; then
plugin_option $1 && shift && continue;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/configure.plugin
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/configure.plugin b/bro-plugin-kafka/configure.plugin
index f5cb384..1cb2086 100644
--- a/bro-plugin-kafka/configure.plugin
+++ b/bro-plugin-kafka/configure.plugin
@@ -1,5 +1,6 @@
#!/bin/sh
#
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -22,6 +23,7 @@ plugin_usage()
{
cat <<EOF
--with-librdkafka=PATH path to librdkafka
+ --with-openssl=PATH path to OpenSSL install root
EOF
}
@@ -30,8 +32,10 @@ plugin_option()
case "$1" in
--with-librdkafka=*)
append_cache_entry LibRdKafka_ROOT_DIR PATH $optarg
- ;;
-
+ ;;
+ --with-openssl=*)
+ append_cache_entry OpenSSL_ROOT_DIR PATH $optarg
+ ;;
*)
return 1;
;;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro b/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
new file mode 100644
index 0000000..12295a9
--- /dev/null
+++ b/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This is loaded when a user activates the plugin. Include scripts here that should be
+# loaded automatically at that point.
+#
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro b/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
new file mode 100644
index 0000000..84e390c
--- /dev/null
+++ b/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+##! load this script to enable log output to kafka
+
+module Kafka;
+
+export {
+ ##
+ ## which log streams should be sent to kafka?
+ ## example:
+ ## redef Kafka::logs_to_send = set(Conn::Log, HTTP::LOG, DNS::LOG);
+ ##
+ const logs_to_send: set[Log::ID] &redef;
+}
+
+event bro_init() &priority=-5
+{
+ for (stream_id in Log::active_streams)
+ {
+ if (stream_id in Kafka::logs_to_send)
+ {
+ local filter: Log::Filter = [
+ $name = fmt("kafka-%s", stream_id),
+ $writer = Log::WRITER_KAFKAWRITER
+ ];
+
+ Log::add_filter(stream_id, filter);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro b/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro
deleted file mode 100644
index 36913d3..0000000
--- a/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#
-# This is loaded when a user activates the plugin. Include scripts here that should be
-# loaded automatically at that point.
-#
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro b/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro
deleted file mode 100644
index 5d0af0c..0000000
--- a/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro
+++ /dev/null
@@ -1,48 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-##! load this script to enable log output to kafka
-
-module Kafka;
-
-export {
- ##
- ## which log streams should be sent to kafka?
- ## example:
- ## redef Kafka::logs_to_send = set(Conn::Log, HTTP::LOG, DNS::LOG);
- ##
- const logs_to_send: set[Log::ID] &redef;
-}
-
-event bro_init() &priority=-5 {
- if(kafka_broker_list == "" || topic_name == "") {
- return;
- }
-
- for (stream_id in Log::active_streams) {
- if (stream_id in Kafka::logs_to_send) {
- local stream_str = fmt("%s", stream_id);
-
- local filter: Log::Filter = [
- $name = fmt("kafka-%s", stream_str),
- $writer = Log::WRITER_KAFKAWRITER
- ];
-
- Log::add_filter(stream_id, filter);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/__load__.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/__load__.bro b/bro-plugin-kafka/scripts/__load__.bro
index e3db306..fee9549 100644
--- a/bro-plugin-kafka/scripts/__load__.bro
+++ b/bro-plugin-kafka/scripts/__load__.bro
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-#
# This is loaded unconditionally at Bro startup. Include scripts here that should
# always be loaded.
#
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/init.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/init.bro b/bro-plugin-kafka/scripts/init.bro
index 9515f88..c76b2a6 100644
--- a/bro-plugin-kafka/scripts/init.bro
+++ b/bro-plugin-kafka/scripts/init.bro
@@ -18,19 +18,10 @@
module Kafka;
export {
- #
- # the kafka broker(s) to which data will be sent
- #
- const kafka_broker_list: string = "localhost:9092" &redef;
-
- #
- # the name of the kafka topic
- #
const topic_name: string = "bro" &redef;
-
- #
- # the maximum amount of time in milliseconds to wait for
- # kafka message delivery upon shutdown
- #
- const max_wait_on_delivery: count = 3000 &redef;
+ const max_wait_on_shutdown: count = 3000 &redef;
+ const tag_json: bool = F &redef;
+ const kafka_conf: table[string] of string = table(
+ ["metadata.broker.list"] = "localhost:9092"
+ ) &redef;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/KafkaWriter.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/KafkaWriter.cc b/bro-plugin-kafka/src/KafkaWriter.cc
index 1e38f7f..9019790 100644
--- a/bro-plugin-kafka/src/KafkaWriter.cc
+++ b/bro-plugin-kafka/src/KafkaWriter.cc
@@ -14,58 +14,61 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-#include <librdkafka/rdkafkacpp.h>
-#include <logging/WriterBackend.h>
-#include <logging/WriterFrontend.h>
+
+#include <Type.h>
+#include <threading/Formatter.h>
#include <threading/formatters/JSON.h>
-#include "KafkaWriter.h"
#include "kafka.bif.h"
-#include "MetronJSON.h"
-
-using metron::kafka::KafkaWriter;
-using logging::WriterBackend;
-using logging::WriterFrontend;
-using threading::Value;
-using threading::Field;
-using threading::formatter::JSON;
-using metron::formatter::MetronJSON;
-
-KafkaWriter::KafkaWriter(WriterFrontend* frontend)
- : WriterBackend(frontend)
- , formatter(NULL)
- , producer(NULL)
- , topic(NULL)
-{
- // kafka broker setting
- kafka_broker_list.assign(
- (const char*)BifConst::Kafka::kafka_broker_list->Bytes(),
- BifConst::Kafka::kafka_broker_list->Len());
+#include "TaggedJSON.h"
+#include "KafkaWriter.h"
- // topic name setting
+using namespace logging;
+using namespace writer;
+
+KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL)
+{
+ // TODO do we need this??
topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(),
BifConst::Kafka::topic_name->Len());
-
- // max wait for queued messages to send on shutdown
- max_wait_on_delivery = BifConst::Kafka::max_wait_on_delivery;
}
-KafkaWriter::~KafkaWriter() {}
+KafkaWriter::~KafkaWriter()
+{}
-bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields,
- const Field* const* fields)
+bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
{
// initialize the formatter
- // 'info.path' indicates the log stream type; aka HTTP::LOG, DNS::LOG
- delete formatter;
- formatter = new MetronJSON(info.path, this, JSON::TS_EPOCH);
+ if(BifConst::Kafka::tag_json) {
+ formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
+ } else {
+ formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
+ }
// kafka global configuration
string err;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
- if (RdKafka::Conf::CONF_OK != conf->set("metadata.broker.list", kafka_broker_list, err)) {
- reporter->Error("Failed to set metatdata.broker.list: %s", err.c_str());
- return false;
+
+ // apply the user-defined settings to kafka
+ Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
+ IterCookie* c = val->AsTable()->InitForIteration();
+ HashKey* k;
+ TableEntryVal* v;
+ while ((v = val->AsTable()->NextEntry(k, c))) {
+
+ // fetch the key and value
+ ListVal* index = val->AsTableVal()->RecoverIndex(k);
+ string key = index->Index(0)->AsString()->CheckString();
+ string val = v->Value()->AsString()->CheckString();
+
+ // apply setting to kafka
+ if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+ reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
+ return false;
+ }
+
+ // cleanup
+ Unref(index);
+ delete k;
}
// create kafka producer
@@ -94,13 +97,14 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields,
bool KafkaWriter::DoFinish(double network_time)
{
bool success = false;
- int interval = 1000;
+ int poll_interval = 1000;
int waited = 0;
+ int max_wait = BifConst::Kafka::max_wait_on_shutdown;
// wait a bit for queued messages to be delivered
- while (producer->outq_len() > 0 && waited < max_wait_on_delivery) {
- producer->poll(interval);
- waited += interval;
+ while (producer->outq_len() > 0 && waited <= max_wait) {
+ producer->poll(poll_interval);
+ waited += poll_interval;
}
// successful only if all messages delivered
@@ -111,6 +115,7 @@ bool KafkaWriter::DoFinish(double network_time)
delete topic;
delete producer;
+ delete formatter;
return success;
}
@@ -119,8 +124,7 @@ bool KafkaWriter::DoFinish(double network_time)
* Writer-specific output method implementing recording of one log
* entry.
*/
-bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields,
- threading::Value** vals)
+bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals)
{
ODesc buff;
buff.Clear();
@@ -180,11 +184,10 @@ bool KafkaWriter::DoFlush(double network_time)
* FinishedRotation() to signal the log manager that potential
* postprocessors can now run.
*/
-bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close,
- bool terminating)
+bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close, bool terminating)
{
// no need to perform log rotation
- return true;
+ return FinishedRotation();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/KafkaWriter.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/KafkaWriter.h b/bro-plugin-kafka/src/KafkaWriter.h
index d63e2dd..2299667 100644
--- a/bro-plugin-kafka/src/KafkaWriter.h
+++ b/bro-plugin-kafka/src/KafkaWriter.h
@@ -14,63 +14,53 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-#ifndef BRO_PLUGIN_METRON_KAFKA_KAFKAWRITER_H
-#define BRO_PLUGIN_METRON_KAFKA_KAFKAWRITER_H
-#include <logging/WriterBackend.h>
-#include <logging/WriterFrontend.h>
-#include <threading/Formatter.h>
+#ifndef BRO_PLUGIN_BRO_KAFKA_KAFKAWRITER_H
+#define BRO_PLUGIN_BRO_KAFKA_KAFKAWRITER_H
+
+#include <string>
#include <librdkafka/rdkafkacpp.h>
+#include <logging/WriterBackend.h>
+#include <threading/formatters/JSON.h>
+#include <Type.h>
+#include "kafka.bif.h"
-using logging::WriterBackend;
-using logging::WriterFrontend;
-using threading::Value;
-using threading::Field;
-using threading::formatter::Formatter;
+#include "TaggedJSON.h"
-namespace metron {
-namespace kafka {
+namespace logging { namespace writer {
- /**
+/**
* A logging writer that sends data to a Kafka broker.
*/
- class KafkaWriter : public WriterBackend {
- public:
- KafkaWriter(WriterFrontend* frontend);
- ~KafkaWriter();
+class KafkaWriter : public WriterBackend {
+
+public:
+ KafkaWriter(WriterFrontend* frontend);
+ ~KafkaWriter();
- static WriterBackend* Instantiate(WriterFrontend* frontend)
- {
- return new KafkaWriter(frontend);
- }
+ static WriterBackend* Instantiate(WriterFrontend* frontend)
+ {
+ return new KafkaWriter(frontend);
+ }
- protected:
- virtual bool DoInit(const WriterBackend::WriterInfo& info, int num_fields,
- const threading::Field* const* fields);
- virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
- threading::Value** vals);
- virtual bool DoSetBuf(bool enabled);
- virtual bool DoRotate(const char* rotated_path, double open, double close,
- bool terminating);
- virtual bool DoFlush(double network_time);
- virtual bool DoFinish(double network_time);
- virtual bool DoHeartbeat(double network_time, double current_time);
+protected:
+ virtual bool DoInit(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields);
+ virtual bool DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals);
+ virtual bool DoSetBuf(bool enabled);
+ virtual bool DoRotate(const char* rotated_path, double open, double close, bool terminating);
+ virtual bool DoFlush(double network_time);
+ virtual bool DoFinish(double network_time);
+ virtual bool DoHeartbeat(double network_time, double current_time);
- private:
- // values defined within 'bro-space'; must match kafka.bif, scripts/init.bro
- string kafka_broker_list;
- string topic_name;
- int max_wait_on_delivery;
+private:
+ string topic_name;
+ threading::formatter::Formatter *formatter;
+ RdKafka::Producer* producer;
+ RdKafka::Topic* topic;
+ RdKafka::Conf* conf;
+ RdKafka::Conf* topic_conf;
+};
- // other
- Formatter* formatter;
- RdKafka::Producer* producer;
- RdKafka::Topic* topic;
- RdKafka::Conf* conf;
- RdKafka::Conf* topic_conf;
- };
-}
-}
+}}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/MetronJSON.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/MetronJSON.cc b/bro-plugin-kafka/src/MetronJSON.cc
deleted file mode 100644
index ec84bec..0000000
--- a/bro-plugin-kafka/src/MetronJSON.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <threading/Formatter.h>
-#include "MetronJSON.h"
-
-using metron::formatter::MetronJSON;
-using threading::formatter::JSON;
-using threading::MsgThread;
-using threading::Field;
-using threading::Value;
-
-MetronJSON::MetronJSON(string sn, MsgThread* t, TimeFormat tf)
- : JSON(t, tf)
- , stream_name(sn)
-{
-}
-
-MetronJSON::~MetronJSON() {}
-
-bool MetronJSON::Describe(ODesc* desc, int num_fields,
- const Field* const* fields, Value** vals) const
-{
- desc->AddRaw("{");
-
- // prepend the stream name
- desc->AddRaw("\"");
- desc->AddRaw(stream_name);
- desc->AddRaw("\": ");
-
- // append the JSON formatted log record itself
- JSON::Describe(desc, num_fields, fields, vals);
-
- desc->AddRaw("}");
- return true;
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/MetronJSON.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/MetronJSON.h b/bro-plugin-kafka/src/MetronJSON.h
deleted file mode 100644
index b9d9185..0000000
--- a/bro-plugin-kafka/src/MetronJSON.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef BRO_PLUGIN_METRON_KAFKA_METRONJSON_H
-#define BRO_PLUGIN_METRON_KAFKA_METRONJSON_H
-
-#include <string>
-#include <threading/Formatter.h>
-#include <threading/formatters/JSON.h>
-
-using threading::Field;
-using threading::Value;
-using threading::formatter::JSON;
-
-namespace metron {
-namespace formatter {
-
- /**
- * A formatter that produces bro records in a format accepted by
- * Metron. Specifically, the stream ID is prepended to each JSON
- * formatted log record.
- *
- * {"conn": { ... }}
- * {"dns": { ... }}
- */
- class MetronJSON : public JSON {
-
- public:
- MetronJSON(string stream_name, threading::MsgThread* t, TimeFormat tf);
- virtual ~MetronJSON();
- virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields,
- Value** vals) const;
-
- private:
- string stream_name;
- };
-}
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/Plugin.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/Plugin.cc b/bro-plugin-kafka/src/Plugin.cc
index f9ba7fb..d523d23 100644
--- a/bro-plugin-kafka/src/Plugin.cc
+++ b/bro-plugin-kafka/src/Plugin.cc
@@ -14,25 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
#include "Plugin.h"
#include "KafkaWriter.h"
-namespace plugin {
-namespace Metron_Kafka {
+namespace plugin { namespace Bro_Kafka {
Plugin plugin;
-}
-}
+}}
-using namespace plugin::Metron_Kafka;
+using namespace plugin::Bro_Kafka;
plugin::Configuration Plugin::Configure()
{
- AddComponent(new ::logging::Component(
- "KafkaWriter", ::metron::kafka::KafkaWriter::Instantiate));
+ AddComponent(new ::logging::Component("KafkaWriter", ::logging::writer::KafkaWriter::Instantiate));
plugin::Configuration config;
- config.name = "Metron::Kafka";
+ config.name = "Bro::Kafka";
config.description = "Writes logs to Kafka";
config.version.major = 0;
config.version.minor = 1;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/Plugin.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/Plugin.h b/bro-plugin-kafka/src/Plugin.h
index 4313a3c..8adeb18 100644
--- a/bro-plugin-kafka/src/Plugin.h
+++ b/bro-plugin-kafka/src/Plugin.h
@@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-#ifndef BRO_PLUGIN_METRON_KAFKA
-#define BRO_PLUGIN_METRON_KAFKA
+
+#ifndef BRO_PLUGIN_BRO_KAFKA
+#define BRO_PLUGIN_BRO_KAFKA
#include <plugin/Plugin.h>
-namespace plugin {
-namespace Metron_Kafka {
+namespace plugin { namespace Bro_Kafka {
class Plugin : public ::plugin::Plugin {
protected:
@@ -30,7 +29,6 @@ namespace Metron_Kafka {
};
extern Plugin plugin;
-}
-}
+}}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/TaggedJSON.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/TaggedJSON.cc b/bro-plugin-kafka/src/TaggedJSON.cc
new file mode 100644
index 0000000..db3f305
--- /dev/null
+++ b/bro-plugin-kafka/src/TaggedJSON.cc
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TaggedJSON.h"
+
+namespace threading { namespace formatter {
+
+TaggedJSON::TaggedJSON(string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn)
+{}
+
+TaggedJSON::~TaggedJSON()
+{}
+
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const
+{
+ desc->AddRaw("{");
+
+ // 'tag' the json; aka prepend the stream name to the json-formatted log content
+ desc->AddRaw("\"");
+ desc->AddRaw(stream_name);
+ desc->AddRaw("\": ");
+
+ // append the JSON formatted log record itself
+ JSON::Describe(desc, num_fields, fields, vals);
+
+ desc->AddRaw("}");
+ return true;
+}
+}}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/TaggedJSON.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/TaggedJSON.h b/bro-plugin-kafka/src/TaggedJSON.h
new file mode 100644
index 0000000..08a50df
--- /dev/null
+++ b/bro-plugin-kafka/src/TaggedJSON.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BRO_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
+#define BRO_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
+
+#include <string>
+#include <threading/Formatter.h>
+#include <threading/formatters/JSON.h>
+
+using threading::formatter::JSON;
+using threading::MsgThread;
+using threading::Value;
+using threading::Field;
+
+namespace threading { namespace formatter {
+
+/*
+ * A JSON formatter that prepends or 'tags' the content with a log stream
+ * identifier. For example,
+ * { 'conn' : { ... }}
+ * { 'http' : { ... }}
+ */
+class TaggedJSON : public JSON {
+
+public:
+ TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
+ virtual ~TaggedJSON();
+ virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const;
+
+private:
+ string stream_name;
+};
+
+}}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/kafka.bif
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/kafka.bif b/bro-plugin-kafka/src/kafka.bif
index a95937f..8a8070c 100644
--- a/bro-plugin-kafka/src/kafka.bif
+++ b/bro-plugin-kafka/src/kafka.bif
@@ -17,6 +17,7 @@
module Kafka;
-const kafka_broker_list: string;
+const kafka_conf: config;
const topic_name: string;
-const max_wait_on_delivery: count;
+const max_wait_on_shutdown: count;
+const tag_json: bool;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/kafka_const.bif
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/kafka_const.bif b/bro-plugin-kafka/src/kafka_const.bif
new file mode 100644
index 0000000..989c0ae
--- /dev/null
+++ b/bro-plugin-kafka/src/kafka_const.bif
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Kafka;
+
+type config : table[string] of string;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/tests/Scripts/get-bro-env
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/tests/Scripts/get-bro-env b/bro-plugin-kafka/tests/Scripts/get-bro-env
index 80cfb11..8aa0ea7 100755
--- a/bro-plugin-kafka/tests/Scripts/get-bro-env
+++ b/bro-plugin-kafka/tests/Scripts/get-bro-env
@@ -15,10 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-#
# BTest helper for getting values for Bro-related environment variables.
-#
+
base=`dirname $0`
bro=`cat ${base}/../../build/CMakeCache.txt | grep BRO_DIST | cut -d = -f 2`
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/tests/kafka/show-plugin.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/tests/kafka/show-plugin.bro b/bro-plugin-kafka/tests/kafka/show-plugin.bro
index b305382..4e8dd6a 100644
--- a/bro-plugin-kafka/tests/kafka/show-plugin.bro
+++ b/bro-plugin-kafka/tests/kafka/show-plugin.bro
@@ -15,5 +15,5 @@
# limitations under the License.
#
-# @TEST-EXEC: bro -NN Metron::Kafka >output
+# @TEST-EXEC: bro -NN Bro::Kafka >output
# @TEST-EXEC: btest-diff output
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/.gitignore
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/.gitignore b/deployment/amazon-ec2/.gitignore
index e066c7b..38b03a9 100644
--- a/deployment/amazon-ec2/.gitignore
+++ b/deployment/amazon-ec2/.gitignore
@@ -1,2 +1,3 @@
*.pem
*.secret
+*.log
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/README.md
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/README.md b/deployment/amazon-ec2/README.md
index 63f0c89..c04f39c 100644
--- a/deployment/amazon-ec2/README.md
+++ b/deployment/amazon-ec2/README.md
@@ -43,11 +43,21 @@ Each of the provisioned hosts will be externally accessible from the internet at
ssh centos@ec2-52-91-215-174.compute-1.amazonaws.com
```
-Multiple Environments
----------------------
+Usage
+-----
+
+### Multiple Environments
This process can support provisioning of multiple, isolated environments. Simply change the `env` settings in `conf/defaults.yml`. For example, you might provision separate development, test, and production environments.
```
env: metron-test
```
+
+### Selective Provisioning
+
+To provision only subsets of the entire Metron deployment, Ansible tags can be specified. For example, to only deploy the sensors on an Amazon EC2 environment, run the following command.
+
+```
+ansible-playbook -i ec2.py playbook.yml --tags "ec2,sensors"
+```
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/ansible.cfg
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/ansible.cfg b/deployment/amazon-ec2/ansible.cfg
index e79770c..c8f26c4 100644
--- a/deployment/amazon-ec2/ansible.cfg
+++ b/deployment/amazon-ec2/ansible.cfg
@@ -21,6 +21,7 @@ roles_path = ../roles
pipelining = True
remote_user = centos
forks = 20
+log_path = ./ansible.log
# fix for "ssh throws 'unix domain socket too long' " problem
[ssh_connection]
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/playbook.yml
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/playbook.yml b/deployment/amazon-ec2/playbook.yml
index b269cfa..b7d9cf3 100644
--- a/deployment/amazon-ec2/playbook.yml
+++ b/deployment/amazon-ec2/playbook.yml
@@ -32,6 +32,8 @@
- include: tasks/create-hosts.yml host_count=1 host_type=ambari_slave,enrichment,metron,ec2
- include: tasks/create-hosts.yml host_count=3 host_type=search,metron,ec2
- include: tasks/create-hosts.yml host_count=1 host_type=web,mysql,metron,ec2
+ tags:
+ - ec2
#
# mount additional data volumes on all ec2 hosts
@@ -42,6 +44,8 @@
- include: tasks/mount-volume.yml vol_src=/dev/xvdb vol_mnt=/data1
- include: tasks/mount-volume.yml vol_src=/dev/xvdc vol_mnt=/data2
- include: tasks/expand-volume.yml volume=/dev/xvda
+ tags:
+ - ec2
#
# build the metron cluster
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/playbooks/ambari_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/ambari_install.yml b/deployment/playbooks/ambari_install.yml
index 68d067e..e6d226b 100644
--- a/deployment/playbooks/ambari_install.yml
+++ b/deployment/playbooks/ambari_install.yml
@@ -18,8 +18,9 @@
- hosts: ec2
become: true
tasks:
- - debug: msg="Detected EC2 - including defaults"
- - include_vars: ../amazon-ec2/conf/defaults.yml
+ - include_vars: ../amazon-ec2/conf/defaults.yml
+ tags:
+ - ec2
- hosts: ambari_*
become: true
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/playbooks/metron_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/metron_install.yml b/deployment/playbooks/metron_install.yml
index daa512a..5c4e085 100644
--- a/deployment/playbooks/metron_install.yml
+++ b/deployment/playbooks/metron_install.yml
@@ -18,9 +18,10 @@
- hosts: ec2
become: true
tasks:
- - debug: msg="Detected EC2 - including defaults"
- - include_vars: ../amazon-ec2/conf/defaults.yml
-
+ - include_vars: ../amazon-ec2/conf/defaults.yml
+ tags:
+ - ec2
+
- hosts: metron
become: true
roles:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/roles/bro/tasks/bro-plugin-kafka.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/tasks/bro-plugin-kafka.yml b/deployment/roles/bro/tasks/bro-plugin-kafka.yml
index 8e953ba..435bc4e 100644
--- a/deployment/roles/bro/tasks/bro-plugin-kafka.yml
+++ b/deployment/roles/bro/tasks/bro-plugin-kafka.yml
@@ -15,7 +15,7 @@
# limitations under the License.
#
---
-- name: Distribute bro plugin
+- name: Distribute bro-kafka plugin
copy: src=../../../bro-plugin-kafka dest=/tmp mode=0755
- name: Compile and install the plugin
@@ -24,16 +24,18 @@
chdir: "/tmp/bro-plugin-kafka"
creates: /usr/local/bro/lib/bro/plugins/METRON_KAFKA
with_items:
+ - rm -rf build/
- "./configure --bro-dist=/tmp/bro-{{ bro_version }}"
- make
- make install
-- name: Configure bro plugin
+- name: Configure bro-kafka plugin
lineinfile:
dest: /usr/local/bro/share/bro/site/local.bro
line: "{{ item }}"
with_items:
- - "@load Metron/Kafka/logs-to-kafka.bro"
+ - "@load Bro/Kafka/logs-to-kafka.bro"
- "redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);"
- - "redef Kafka::kafka_broker_list = \"{{ kafka_broker_url }}\";"
- "redef Kafka::topic_name = \"{{ bro_topic }}\";"
+ - "redef Kafka::tag_json = T;"
+ - "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] = \"{{ kafka_broker_url }}\");"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/roles/bro/tasks/librdkafka.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/tasks/librdkafka.yml b/deployment/roles/bro/tasks/librdkafka.yml
index 561bae2..925c18f 100644
--- a/deployment/roles/bro/tasks/librdkafka.yml
+++ b/deployment/roles/bro/tasks/librdkafka.yml
@@ -17,22 +17,23 @@
---
- name: Download librdkafka
get_url:
- url: https://github.com/edenhill/librdkafka/archive/v0.9.0.tar.gz
- dest: /tmp/librdkafka-0.9.0.tar.gz
+ url: "{{ librdkafka_url }}"
+ dest: "/tmp/librdkafka-{{ librdkafka_version }}.tar.gz"
- name: Extract librdkafka tarball
unarchive:
- src: /tmp/librdkafka-0.9.0.tar.gz
+ src: "/tmp/librdkafka-{{ librdkafka_version }}.tar.gz"
dest: /tmp
copy: no
- creates: /tmp/librdkafka-0.9.0
+ creates: "/tmp/librdkafka-{{ librdkafka_version }}"
- name: Compile and install librdkafka
shell: "{{ item }}"
args:
- chdir: /tmp/librdkafka-0.9.0
+ chdir: "/tmp/librdkafka-{{ librdkafka_version }}"
creates: /usr/local/lib/librdkafka.so
with_items:
+ - rm -rf build/
- ./configure
- make
- make install
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/roles/bro/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/vars/main.yml b/deployment/roles/bro/vars/main.yml
index 88e6f64..8141253 100644
--- a/deployment/roles/bro/vars/main.yml
+++ b/deployment/roles/bro/vars/main.yml
@@ -17,3 +17,5 @@
---
bro_version: 2.4.1
bro_topic: bro
+librdkafka_version: 0.8.6
+librdkafka_url: https://github.com/edenhill/librdkafka/archive/0.8.6.tar.gz
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d8ac9d1..d2949e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,32 +29,34 @@
<description>Performs release auditing for Metron.</description>
<url>https://metron.incubator.apache.org/</url>
<build>
- <plugins>
+ <plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.11</version>
<configuration>
- <excludes>
- <exclude>**/README.md</exclude>
- <exclude>**/VERSION</exclude>
- <exclude>**/*.json</exclude>
- <exclude>**/*.log</exclude>
- <exclude>**/*.ldif</exclude>
- <exclude>**/*.template</exclude>
- <exclude>**/.*</exclude>
- <exclude>**/.*/**</exclude>
- <exclude>**/*.seed</exclude>
- <exclude>**/*.iml</exclude>
- <exclude>**/ansible.cfg</exclude>
- <exclude>site/**</exclude>
- <exclude>metron-ui/lib/public/**</exclude>
- <exclude>**/src/main/resources/patterns/**</exclude>
- <exclude>**/src/test/resources/**</exclude>
- <exclude>**/src/main/resources/Sample*/**</exclude>
- <exclude>**/dependency-reduced-pom.xml</exclude>
- <exclude>**/files/opensoc-ui</exclude>
- </excludes>
+ <excludes>
+ <exclude>**/README.md</exclude>
+ <exclude>**/VERSION</exclude>
+ <exclude>**/*.json</exclude>
+ <exclude>**/*.log</exclude>
+ <exclude>**/*.ldif</exclude>
+ <exclude>**/*.template</exclude>
+ <exclude>**/.*</exclude>
+ <exclude>**/.*/**</exclude>
+ <exclude>**/*.seed</exclude>
+ <exclude>**/*.iml</exclude>
+ <exclude>**/ansible.cfg</exclude>
+ <exclude>site/**</exclude>
+ <exclude>metron-ui/lib/public/**</exclude>
+ <exclude>**/src/main/resources/patterns/**</exclude>
+ <exclude>**/src/test/resources/**</exclude>
+ <exclude>**/src/main/resources/Sample*/**</exclude>
+ <exclude>**/dependency-reduced-pom.xml</exclude>
+ <exclude>**/files/opensoc-ui</exclude>
+ <exclude>**/target/**</exclude>
+ <exclude>**/bro-plugin-kafka/build/**</exclude>
+ </excludes>
</configuration>
</plugin>
</plugins>