You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/21 18:06:15 UTC

[33/43] incubator-metron git commit: METRON-64 Fixed 'dns/Log::WRITER_KAFKAWRITER' Error (nickwallen via cestella) closes apache/incubator-metron#43

METRON-64 Fixed 'dns/Log::WRITER_KAFKAWRITER' Error (nickwallen via cestella) closes apache/incubator-metron#43


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/75666533
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/75666533
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/75666533

Branch: refs/heads/Metron_0.1BETA
Commit: 75666533c74befd2de4c25797a3b8b3b136c026a
Parents: f32af01
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Mar 14 10:15:19 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Mon Mar 14 10:15:19 2016 -0400

----------------------------------------------------------------------
 bro-plugin-kafka/CHANGES                        | 25 +++--
 bro-plugin-kafka/CMakeLists.txt                 | 42 +++++----
 bro-plugin-kafka/MAINTAINER                     | 18 ++++
 bro-plugin-kafka/Makefile                       | 26 +++---
 bro-plugin-kafka/README                         | 96 +++++++++++++++++++
 bro-plugin-kafka/README.md                      | 88 ------------------
 bro-plugin-kafka/VERSION                        | 17 ++++
 bro-plugin-kafka/cmake/FindOpenSSL.cmake        | 72 +++++++++++++++
 bro-plugin-kafka/configure                      | 31 +++----
 bro-plugin-kafka/configure.plugin               |  8 +-
 bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro | 19 ++++
 .../scripts/Bro/Kafka/logs-to-kafka.bro         | 44 +++++++++
 .../scripts/Metron/Kafka/__load__.bro           | 21 -----
 .../scripts/Metron/Kafka/logs-to-kafka.bro      | 48 ----------
 bro-plugin-kafka/scripts/__load__.bro           |  2 -
 bro-plugin-kafka/scripts/init.bro               | 19 +---
 bro-plugin-kafka/src/KafkaWriter.cc             | 97 ++++++++++----------
 bro-plugin-kafka/src/KafkaWriter.h              | 84 ++++++++---------
 bro-plugin-kafka/src/MetronJSON.cc              | 50 ----------
 bro-plugin-kafka/src/MetronJSON.h               | 54 -----------
 bro-plugin-kafka/src/Plugin.cc                  | 15 ++-
 bro-plugin-kafka/src/Plugin.h                   | 12 +--
 bro-plugin-kafka/src/TaggedJSON.cc              | 43 +++++++++
 bro-plugin-kafka/src/TaggedJSON.h               | 50 ++++++++++
 bro-plugin-kafka/src/kafka.bif                  |  5 +-
 bro-plugin-kafka/src/kafka_const.bif            | 20 ++++
 bro-plugin-kafka/tests/Scripts/get-bro-env      |  4 +-
 bro-plugin-kafka/tests/kafka/show-plugin.bro    |  2 +-
 deployment/amazon-ec2/.gitignore                |  1 +
 deployment/amazon-ec2/README.md                 | 14 ++-
 deployment/amazon-ec2/ansible.cfg               |  1 +
 deployment/amazon-ec2/playbook.yml              |  4 +
 deployment/playbooks/ambari_install.yml         |  5 +-
 deployment/playbooks/metron_install.yml         |  7 +-
 deployment/roles/bro/tasks/bro-plugin-kafka.yml | 10 +-
 deployment/roles/bro/tasks/librdkafka.yml       | 11 ++-
 deployment/roles/bro/vars/main.yml              |  2 +
 pom.xml                                         | 44 ++++-----
 38 files changed, 617 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/CHANGES
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/CHANGES b/bro-plugin-kafka/CHANGES
index 3df60b0..d9e26de 100644
--- a/bro-plugin-kafka/CHANGES
+++ b/bro-plugin-kafka/CHANGES
@@ -1,17 +1,16 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
 #

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/CMakeLists.txt b/bro-plugin-kafka/CMakeLists.txt
index f578eb2..30bf3b5 100644
--- a/bro-plugin-kafka/CMakeLists.txt
+++ b/bro-plugin-kafka/CMakeLists.txt
@@ -1,39 +1,43 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
 #
 
 cmake_minimum_required(VERSION 2.8)
 project(Plugin)
 include(BroPlugin)
 find_package(LibRDKafka)
+find_package(OpenSSL)
 
-if ( LIBRDKAFKA_FOUND )
-  include_directories(BEFORE ${LibRDKafka_INCLUDE_DIR})
-
-  bro_plugin_begin(METRON KAFKA)
+if (LIBRDKAFKA_FOUND AND OPENSSL_FOUND)
+  include_directories(BEFORE ${LibRDKafka_INCLUDE_DIR} ${OpenSSL_INCLUDE_DIR})
+  bro_plugin_begin(BRO KAFKA)
   bro_plugin_cc(src/KafkaWriter.cc)
-  bro_plugin_cc(src/MetronJSON.cc)
   bro_plugin_cc(src/Plugin.cc)
+  bro_plugin_cc(src/TaggedJSON.cc)
   bro_plugin_bif(src/kafka.bif)
   bro_plugin_dist_files(README CHANGES COPYING VERSION)
   bro_plugin_link_library(${LibRDKafka_LIBRARIES})
   bro_plugin_link_library(${LibRDKafka_C_LIBRARIES})
+  bro_plugin_link_library(${OpenSSL_LIBRARIES})
   bro_plugin_end()
 
-else ()
+elseif (NOT LIBRDKAFKA_FOUND)
   message(FATAL_ERROR "LibRDKafka not found.")
+
+elseif (NOT OPENSSL_FOUND)
+  message(FATAL_ERROR "OpenSSL not found.")
+
 endif ()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/MAINTAINER
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/MAINTAINER b/bro-plugin-kafka/MAINTAINER
new file mode 100644
index 0000000..0ddaed7
--- /dev/null
+++ b/bro-plugin-kafka/MAINTAINER
@@ -0,0 +1,18 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+Apache Metron <us...@metron.incubator.apache.org>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/Makefile
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/Makefile b/bro-plugin-kafka/Makefile
index e4157c9..50fa3ca 100644
--- a/bro-plugin-kafka/Makefile
+++ b/bro-plugin-kafka/Makefile
@@ -1,20 +1,18 @@
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
 #
 # Convenience Makefile providing a few common top-level targets.
 #

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/README
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/README b/bro-plugin-kafka/README
new file mode 100644
index 0000000..c672bd0
--- /dev/null
+++ b/bro-plugin-kafka/README
@@ -0,0 +1,96 @@
+
+===============================
+Writing Logging Output to Kafka
+===============================
+
+A log writer that sends logging output to Kafka.  This provides a convenient
+means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to
+process the data generated by Bro.
+
+.. contents::
+
+Installation
+------------
+
+Install librdkafka (https://github.com/edenhill/librdkafka), a native client
+library for Kafka.  This plugin has been tested against the latest release of
+librdkafka, which at the time of this writing is v0.8.6.
+
+    # curl -L https://github.com/edenhill/librdkafka/archive/0.8.6.tar.gz | tar xvz
+    # cd librdkafka-0.8.6/
+    # ./configure
+    # make
+    # sudo make install
+
+Then compile this Bro plugin using the following commands.
+
+    # ./configure --bro-dist=$BRO_SRC
+    # make
+    # sudo make install
+
+Run the following command to ensure that the plugin was installed successfully.
+
+    # bro -N Bro::Kafka
+    Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1)
+
+Activation
+----------
+
+The easiest way to enable Kafka output is to load the plugin's
+``logs-to-kafka.bro`` script.  If you are using BroControl, the following lines
+added to local.bro will activate it.
+
+.. console::
+
+    @load Bro/Kafka/logs-to-kafka.bro
+    redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
+    redef Kafka::topic_name = "bro";
+    redef Kafka::kafka_conf = table(
+        ["metadata.broker.list"] = "localhost:9092"
+    );
+
+This example will send all HTTP, DNS, and Conn logs to a Kafka broker running on
+the localhost to a topic called ``bro``. Any configuration value accepted by
+librdkafka can be added to the ``kafka_conf`` configuration table.
+
+Settings
+--------
+
+``kafka_conf``
+
+The global configuration settings for Kafka.  These values are passed through
+directly to librdkafka.  Any valid librdkafka settings can be defined in this
+table.
+
+.. console::
+
+    redef Kafka::kafka_conf = table(
+        ["metadata.broker.list"] = "localhost:9092",
+        ["client.id"] = "bro"
+    );
+
+``topic_name``
+
+The name of the topic in Kafka where all Bro logs will be sent to.
+
+.. console::
+
+    redef Kafka::topic_name = "bro";
+
+``max_wait_on_shutdown``
+
+The maximum number of milliseconds that the plugin will wait for any backlog of
+queued messages to be sent to Kafka before forced shutdown.
+
+.. console::
+
+    redef Kafka::max_wait_on_shutdown = 3000;
+
+``tag_json``
+
+If true, a log stream identifier is appended to each JSON-formatted message. For
+example, a Conn::LOG message will look like ``{ 'conn' : { ... }}``.
+
+.. console::
+
+    redef Kafka::tag_json = T;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/README.md
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/README.md b/bro-plugin-kafka/README.md
deleted file mode 100644
index 6e965c5..0000000
--- a/bro-plugin-kafka/README.md
+++ /dev/null
@@ -1,88 +0,0 @@
-
-[Bro](https://www.bro.org/) Logging with Kafka
-==============================================
-
-A Bro Log filter that sends log data to [Kafka](http://kafka.apache.org/).  This
-provides a convenient means for tools in the Hadoop ecosystem, such as [Storm](http://storm.apache.org/),
-[Spark](http://spark.apache.org/), and others, to process the log data generated by Bro.
-
-Installation
-------------
-
-Install [librdkafka](https://github.com/edenhill/librdkafka), a native client
-library for Kafka.
-
-Download and compile Bro from source code.  Replace version 2.4.1 with the
-most recent stable release.  See the INSTALL documentation within the Bro release
-for more details on prerequisites and compilation.
-
-```
-curl https://www.bro.org/downloads/release/bro-2.4.1.tar.gz | tar -xvz
-cd bro-2.4.1
-./configure
-make
-sudo make install
-export BRO_SRC=`pwd`
-```
-
-Download and compile this Kafka plugin for Bro.
-
-```
-./configure --bro-dist=$BRO_SRC
-make
-sudo make install
-```
-
-Optional: Instead of running `make install` as above, after the plugin has been
-compiled, the `build` directory can be added to the `BRO_PLUGIN_PATH`.  Bro will
-automatically load any plugins that it finds at this location.
-
-```
-export BRO_PLUGIN_PATH="/home/the-dude/bro-plugin-kafka/build/"
-```
-
-Run the following command to ensure that the plugin was installed successfully.
-The 'bro' executable must be in your PATH.
-
-```
-bro -N Metron::Kafka
-```
-
-Getting Started
----------------
-
-After installing the plugin, create a bro script that will activate it.  In most cases, this can be placed in the site/local file. In most cases this file is located at `$BRO_HOME/share/bro/site/local.bro`.
-
-This following example Bro script will direct all HTTP, DNS, and Conn logs to a Kafka broker running locally to a topic called `bro`.
-
-```
-@load Metron/Kafka/logs-to-kafka.bro
-redef Kafka::logs_to_send = set(Conn::LOG, HTTP::LOG, DNS::LOG);
-redef Kafka::kafka_broker_list = "localhost:9092";
-redef Kafka::topic_name = "bro";
-```
-
-Next, simply start Bro.   If you are starting Bro with `broctl` after updating `local.bro` you will need to run `install` and then `start`.
-
-```
-$ /usr/local/bro/bin/broctl
-
-Welcome to BroControl 1.4
-
-Type "help" for help.
-
-[BroControl] > install
-removing old policies in /usr/local/bro/spool/installed-scripts-do-not-touch/site ...
-removing old policies in /usr/local/bro/spool/installed-scripts-do-not-touch/auto ...
-creating policy directories ...
-installing site policies ...
-generating standalone-layout.bro ...
-generating local-networks.bro ...
-generating broctl-config.bro ...
-generating broctl-config.sh ...
-updating nodes ...
-[BroControl] > start
-starting bro ...
-```
-
-The plugin does not interfere with the existing file-based logging.  To validate proper functioning compare the data being received at Kafka to the data in the local log file.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/VERSION
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/VERSION b/bro-plugin-kafka/VERSION
index 49d5957..204e5ca 100644
--- a/bro-plugin-kafka/VERSION
+++ b/bro-plugin-kafka/VERSION
@@ -1 +1,18 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
 0.1

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/cmake/FindOpenSSL.cmake
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/cmake/FindOpenSSL.cmake b/bro-plugin-kafka/cmake/FindOpenSSL.cmake
new file mode 100644
index 0000000..5ed955c
--- /dev/null
+++ b/bro-plugin-kafka/cmake/FindOpenSSL.cmake
@@ -0,0 +1,72 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+# - Try to find openssl include dirs and libraries
+#
+# Usage of this module as follows:
+#
+#     find_package(OpenSSL)
+#
+# Variables used by this module, they can change the default behaviour and need
+# to be set before calling find_package:
+#
+#  OpenSSL_ROOT_DIR          Set this variable to the root installation of
+#                            openssl if the module has problems finding the
+#                            proper installation path.
+#
+# Variables defined by this module:
+#
+#  OPENSSL_FOUND             System has openssl, include and library dirs found
+#  OpenSSL_INCLUDE_DIR       The openssl include directories.
+#  OpenSSL_LIBRARIES         The openssl libraries.
+#  OpenSSL_CYRPTO_LIBRARY    The openssl crypto library.
+#  OpenSSL_SSL_LIBRARY       The openssl ssl library.
+
+find_path(OpenSSL_ROOT_DIR
+    NAMES include/openssl/ssl.h
+)
+
+find_path(OpenSSL_INCLUDE_DIR
+    NAMES openssl/ssl.h
+    HINTS ${OpenSSL_ROOT_DIR}/include
+)
+
+find_library(OpenSSL_SSL_LIBRARY
+    NAMES ssl ssleay32 ssleay32MD
+    HINTS ${OpenSSL_ROOT_DIR}/lib
+)
+
+find_library(OpenSSL_CRYPTO_LIBRARY
+    NAMES crypto
+    HINTS ${OpenSSL_ROOT_DIR}/lib
+)
+
+set(OpenSSL_LIBRARIES ${OpenSSL_SSL_LIBRARY} ${OpenSSL_CRYPTO_LIBRARY}
+    CACHE STRING "OpenSSL SSL and crypto libraries" FORCE)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(OpenSSL DEFAULT_MSG
+    OpenSSL_LIBRARIES
+    OpenSSL_INCLUDE_DIR
+)
+
+mark_as_advanced(
+    OpenSSL_ROOT_DIR
+    OpenSSL_INCLUDE_DIR
+    OpenSSL_LIBRARIES
+    OpenSSL_CRYPTO_LIBRARY
+    OpenSSL_SSL_LIBRARY
+)

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/configure
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/configure b/bro-plugin-kafka/configure
index b496886..d053488 100755
--- a/bro-plugin-kafka/configure
+++ b/bro-plugin-kafka/configure
@@ -1,21 +1,20 @@
 #!/bin/sh
 #
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
 #
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
 #
 # Wrapper for viewing/setting options that the plugin's CMake
 # scripts will recognize.
@@ -84,15 +83,15 @@ while [ $# -ne 0 ]; do
         --help|-h)
             usage
             ;;
-
         --bro-dist=*)
             brodist=`cd $optarg && pwd`
             ;;
-
         --install-root=*)
             installroot=$optarg
             ;;
-
+        --with-openssl=*)
+            append_cache_entry OpenSSL_ROOT_DIR PATH $optarg
+            ;;
         *)
             if type plugin_option >/dev/null 2>&1; then
                 plugin_option $1 && shift && continue;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/configure.plugin
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/configure.plugin b/bro-plugin-kafka/configure.plugin
index f5cb384..1cb2086 100644
--- a/bro-plugin-kafka/configure.plugin
+++ b/bro-plugin-kafka/configure.plugin
@@ -1,5 +1,6 @@
 #!/bin/sh
 #
+#
 #  Licensed to the Apache Software Foundation (ASF) under one or more
 #  contributor license agreements.  See the NOTICE file distributed with
 #  this work for additional information regarding copyright ownership.
@@ -22,6 +23,7 @@ plugin_usage()
 {
   cat <<EOF
   --with-librdkafka=PATH	 path to librdkafka
+  --with-openssl=PATH      path to OpenSSL install root
 EOF
 }
 
@@ -30,8 +32,10 @@ plugin_option()
   case "$1" in
     --with-librdkafka=*)
       append_cache_entry LibRdKafka_ROOT_DIR PATH $optarg
-    ;;
-
+      ;;
+    --with-openssl=*)
+      append_cache_entry OpenSSL_ROOT_DIR PATH $optarg
+      ;;
     *)
       return 1;
     ;;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro b/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
new file mode 100644
index 0000000..12295a9
--- /dev/null
+++ b/bro-plugin-kafka/scripts/Bro/Kafka/__load__.bro
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+# This is loaded when a user activates the plugin. Include scripts here that should be
+# loaded automatically at that point.
+#

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro b/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
new file mode 100644
index 0000000..84e390c
--- /dev/null
+++ b/bro-plugin-kafka/scripts/Bro/Kafka/logs-to-kafka.bro
@@ -0,0 +1,44 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+##! load this script to enable log output to kafka
+
+module Kafka;
+
+export {
+	##
+	## which log streams should be sent to kafka?
+	## example:
+	##		redef Kafka::logs_to_send = set(Conn::Log, HTTP::LOG, DNS::LOG);
+	##
+	const logs_to_send: set[Log::ID] &redef;
+}
+
+event bro_init() &priority=-5
+{
+	for (stream_id in Log::active_streams)
+	{
+		if (stream_id in Kafka::logs_to_send)
+		{
+			local filter: Log::Filter = [
+				$name = fmt("kafka-%s", stream_id),
+				$writer = Log::WRITER_KAFKAWRITER
+			];
+
+			Log::add_filter(stream_id, filter);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro b/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro
deleted file mode 100644
index 36913d3..0000000
--- a/bro-plugin-kafka/scripts/Metron/Kafka/__load__.bro
+++ /dev/null
@@ -1,21 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-#
-# This is loaded when a user activates the plugin. Include scripts here that should be
-# loaded automatically at that point.
-#

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro b/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro
deleted file mode 100644
index 5d0af0c..0000000
--- a/bro-plugin-kafka/scripts/Metron/Kafka/logs-to-kafka.bro
+++ /dev/null
@@ -1,48 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-##! load this script to enable log output to kafka
-
-module Kafka;
-
-export {
-	##
-	## which log streams should be sent to kafka?
-	## example:
-	##		redef Kafka::logs_to_send = set(Conn::Log, HTTP::LOG, DNS::LOG);
-	##
-	const logs_to_send: set[Log::ID] &redef;
-}
-
-event bro_init() &priority=-5 {
-	if(kafka_broker_list == "" || topic_name == "") {
-		return;
-	}
-
-	for (stream_id in Log::active_streams) {
-		if (stream_id in Kafka::logs_to_send) {
-			local stream_str = fmt("%s", stream_id);
-
-			local filter: Log::Filter = [
-				$name = fmt("kafka-%s", stream_str),
-				$writer = Log::WRITER_KAFKAWRITER
-			];
-
-			Log::add_filter(stream_id, filter);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/__load__.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/__load__.bro b/bro-plugin-kafka/scripts/__load__.bro
index e3db306..fee9549 100644
--- a/bro-plugin-kafka/scripts/__load__.bro
+++ b/bro-plugin-kafka/scripts/__load__.bro
@@ -14,8 +14,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-
-#
 # This is loaded unconditionally at Bro startup. Include scripts here that should
 # always be loaded.
 #

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/scripts/init.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/scripts/init.bro b/bro-plugin-kafka/scripts/init.bro
index 9515f88..c76b2a6 100644
--- a/bro-plugin-kafka/scripts/init.bro
+++ b/bro-plugin-kafka/scripts/init.bro
@@ -18,19 +18,10 @@
 module Kafka;
 
 export {
-  #
-  # the kafka broker(s) to which data will be sent
-  #
-  const kafka_broker_list: string = "localhost:9092" &redef;
-
-  #
-  # the name of the kafka topic
-  #
   const topic_name: string = "bro" &redef;
-
-  #
-  # the maximum amount of time in milliseconds to wait for
-  # kafka message delivery upon shutdown
-  #
-  const max_wait_on_delivery: count = 3000 &redef;
+  const max_wait_on_shutdown: count = 3000 &redef;
+  const tag_json: bool = F &redef;
+  const kafka_conf: table[string] of string = table(
+    ["metadata.broker.list"] = "localhost:9092"
+  ) &redef;
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/KafkaWriter.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/KafkaWriter.cc b/bro-plugin-kafka/src/KafkaWriter.cc
index 1e38f7f..9019790 100644
--- a/bro-plugin-kafka/src/KafkaWriter.cc
+++ b/bro-plugin-kafka/src/KafkaWriter.cc
@@ -14,58 +14,61 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
-#include <librdkafka/rdkafkacpp.h>
-#include <logging/WriterBackend.h>
-#include <logging/WriterFrontend.h>
+
+#include <Type.h>
+#include <threading/Formatter.h>
 #include <threading/formatters/JSON.h>
-#include "KafkaWriter.h"
 #include "kafka.bif.h"
-#include "MetronJSON.h"
-
-using metron::kafka::KafkaWriter;
-using logging::WriterBackend;
-using logging::WriterFrontend;
-using threading::Value;
-using threading::Field;
-using threading::formatter::JSON;
-using metron::formatter::MetronJSON;
-
-KafkaWriter::KafkaWriter(WriterFrontend* frontend)
-    : WriterBackend(frontend)
-    , formatter(NULL)
-    , producer(NULL)
-    , topic(NULL)
-{
-    // kafka broker setting
-    kafka_broker_list.assign(
-        (const char*)BifConst::Kafka::kafka_broker_list->Bytes(),
-        BifConst::Kafka::kafka_broker_list->Len());
+#include "TaggedJSON.h"
+#include "KafkaWriter.h"
 
-    // topic name setting
+using namespace logging;
+using namespace writer;
+
+KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL)
+{
+    // TODO do we need this??
     topic_name.assign((const char*)BifConst::Kafka::topic_name->Bytes(),
         BifConst::Kafka::topic_name->Len());
-
-    // max wait for queued messages to send on shutdown
-    max_wait_on_delivery = BifConst::Kafka::max_wait_on_delivery;
 }
 
-KafkaWriter::~KafkaWriter() {}
+KafkaWriter::~KafkaWriter()
+{}
 
-bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields,
-    const Field* const* fields)
+bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
 {
     // initialize the formatter
-    // 'info.path' indicates the log stream type; aka HTTP::LOG, DNS::LOG
-    delete formatter;
-    formatter = new MetronJSON(info.path, this, JSON::TS_EPOCH);
+    if(BifConst::Kafka::tag_json) {
+      formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
+    } else {
+      formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
+    }
 
     // kafka global configuration
     string err;
     conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
-    if (RdKafka::Conf::CONF_OK != conf->set("metadata.broker.list", kafka_broker_list, err)) {
-        reporter->Error("Failed to set metatdata.broker.list: %s", err.c_str());
-        return false;
+
+    // apply the user-defined settings to kafka
+    Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
+    IterCookie* c = val->AsTable()->InitForIteration();
+    HashKey* k;
+    TableEntryVal* v;
+    while ((v = val->AsTable()->NextEntry(k, c))) {
+
+        // fetch the key and value
+        ListVal* index = val->AsTableVal()->RecoverIndex(k);
+        string key = index->Index(0)->AsString()->CheckString();
+        string val = v->Value()->AsString()->CheckString();
+
+        // apply setting to kafka
+        if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+            reporter->Error("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str());
+            return false;
+        }
+
+        // cleanup
+        Unref(index);
+        delete k;
     }
 
     // create kafka producer
@@ -94,13 +97,14 @@ bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields,
 bool KafkaWriter::DoFinish(double network_time)
 {
     bool success = false;
-    int interval = 1000;
+    int poll_interval = 1000;
     int waited = 0;
+    int max_wait = BifConst::Kafka::max_wait_on_shutdown;
 
     // wait a bit for queued messages to be delivered
-    while (producer->outq_len() > 0 && waited < max_wait_on_delivery) {
-        producer->poll(interval);
-        waited += interval;
+    while (producer->outq_len() > 0 && waited <= max_wait) {
+        producer->poll(poll_interval);
+        waited += poll_interval;
     }
 
     // successful only if all messages delivered
@@ -111,6 +115,7 @@ bool KafkaWriter::DoFinish(double network_time)
 
     delete topic;
     delete producer;
+    delete formatter;
 
     return success;
 }
@@ -119,8 +124,7 @@ bool KafkaWriter::DoFinish(double network_time)
  * Writer-specific output method implementing recording of one log
  * entry.
  */
-bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields,
-    threading::Value** vals)
+bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals)
 {
     ODesc buff;
     buff.Clear();
@@ -180,11 +184,10 @@ bool KafkaWriter::DoFlush(double network_time)
  * FinishedRotation() to signal the log manager that potential
  * postprocessors can now run.
  */
-bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close,
-    bool terminating)
+bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close, bool terminating)
 {
     // no need to perform log rotation
-    return true;
+    return FinishedRotation();
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/KafkaWriter.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/KafkaWriter.h b/bro-plugin-kafka/src/KafkaWriter.h
index d63e2dd..2299667 100644
--- a/bro-plugin-kafka/src/KafkaWriter.h
+++ b/bro-plugin-kafka/src/KafkaWriter.h
@@ -14,63 +14,53 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
-#ifndef BRO_PLUGIN_METRON_KAFKA_KAFKAWRITER_H
-#define BRO_PLUGIN_METRON_KAFKA_KAFKAWRITER_H
 
-#include <logging/WriterBackend.h>
-#include <logging/WriterFrontend.h>
-#include <threading/Formatter.h>
+#ifndef BRO_PLUGIN_BRO_KAFKA_KAFKAWRITER_H
+#define BRO_PLUGIN_BRO_KAFKA_KAFKAWRITER_H
+
+#include <string>
 #include <librdkafka/rdkafkacpp.h>
+#include <logging/WriterBackend.h>
+#include <threading/formatters/JSON.h>
+#include <Type.h>
+#include "kafka.bif.h"
 
-using logging::WriterBackend;
-using logging::WriterFrontend;
-using threading::Value;
-using threading::Field;
-using threading::formatter::Formatter;
+#include "TaggedJSON.h"
 
-namespace metron {
-namespace kafka {
+namespace logging { namespace writer {
 
-    /**
+/**
  * A logging writer that sends data to a Kafka broker.
  */
-    class KafkaWriter : public WriterBackend {
-    public:
-        KafkaWriter(WriterFrontend* frontend);
-        ~KafkaWriter();
+class KafkaWriter : public WriterBackend {
+
+public:
+    KafkaWriter(WriterFrontend* frontend);
+    ~KafkaWriter();
 
-        static WriterBackend* Instantiate(WriterFrontend* frontend)
-        {
-            return new KafkaWriter(frontend);
-        }
+    static WriterBackend* Instantiate(WriterFrontend* frontend)
+    {
+        return new KafkaWriter(frontend);
+    }
 
-    protected:
-        virtual bool DoInit(const WriterBackend::WriterInfo& info, int num_fields,
-            const threading::Field* const* fields);
-        virtual bool DoWrite(int num_fields, const threading::Field* const* fields,
-            threading::Value** vals);
-        virtual bool DoSetBuf(bool enabled);
-        virtual bool DoRotate(const char* rotated_path, double open, double close,
-            bool terminating);
-        virtual bool DoFlush(double network_time);
-        virtual bool DoFinish(double network_time);
-        virtual bool DoHeartbeat(double network_time, double current_time);
+protected:
+    virtual bool DoInit(const WriterBackend::WriterInfo& info, int num_fields, const threading::Field* const* fields);
+    virtual bool DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals);
+    virtual bool DoSetBuf(bool enabled);
+    virtual bool DoRotate(const char* rotated_path, double open, double close, bool terminating);
+    virtual bool DoFlush(double network_time);
+    virtual bool DoFinish(double network_time);
+    virtual bool DoHeartbeat(double network_time, double current_time);
 
-    private:
-        // values defined within 'bro-space'; must match kafka.bif, scripts/init.bro
-        string kafka_broker_list;
-        string topic_name;
-        int max_wait_on_delivery;
+private:
+    string topic_name;
+    threading::formatter::Formatter *formatter;
+    RdKafka::Producer* producer;
+    RdKafka::Topic* topic;
+    RdKafka::Conf* conf;
+    RdKafka::Conf* topic_conf;
+};
 
-        // other
-        Formatter* formatter;
-        RdKafka::Producer* producer;
-        RdKafka::Topic* topic;
-        RdKafka::Conf* conf;
-        RdKafka::Conf* topic_conf;
-    };
-}
-}
+}}
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/MetronJSON.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/MetronJSON.cc b/bro-plugin-kafka/src/MetronJSON.cc
deleted file mode 100644
index ec84bec..0000000
--- a/bro-plugin-kafka/src/MetronJSON.cc
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-#include <threading/Formatter.h>
-#include "MetronJSON.h"
-
-using metron::formatter::MetronJSON;
-using threading::formatter::JSON;
-using threading::MsgThread;
-using threading::Field;
-using threading::Value;
-
-MetronJSON::MetronJSON(string sn, MsgThread* t, TimeFormat tf)
-    : JSON(t, tf)
-    , stream_name(sn)
-{
-}
-
-MetronJSON::~MetronJSON() {}
-
-bool MetronJSON::Describe(ODesc* desc, int num_fields,
-    const Field* const* fields, Value** vals) const
-{
-    desc->AddRaw("{");
-
-    // prepend the stream name
-    desc->AddRaw("\"");
-    desc->AddRaw(stream_name);
-    desc->AddRaw("\": ");
-
-    // append the JSON formatted log record itself
-    JSON::Describe(desc, num_fields, fields, vals);
-
-    desc->AddRaw("}");
-    return true;
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/MetronJSON.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/MetronJSON.h b/bro-plugin-kafka/src/MetronJSON.h
deleted file mode 100644
index b9d9185..0000000
--- a/bro-plugin-kafka/src/MetronJSON.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-#ifndef BRO_PLUGIN_METRON_KAFKA_METRONJSON_H
-#define BRO_PLUGIN_METRON_KAFKA_METRONJSON_H
-
-#include <string>
-#include <threading/Formatter.h>
-#include <threading/formatters/JSON.h>
-
-using threading::Field;
-using threading::Value;
-using threading::formatter::JSON;
-
-namespace metron {
-namespace formatter {
-
- /**
-  * A formatter that produces bro records in a format accepted by
-  * Metron. Specifically, the stream ID is prepended to each JSON
-  * formatted log record.
-  *
-  * {"conn": { ... }}
-  * {"dns":  { ... }}
-  */
-    class MetronJSON : public JSON {
-
-    public:
-        MetronJSON(string stream_name, threading::MsgThread* t, TimeFormat tf);
-        virtual ~MetronJSON();
-        virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields,
-            Value** vals) const;
-
-    private:
-        string stream_name;
-    };
-}
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/Plugin.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/Plugin.cc b/bro-plugin-kafka/src/Plugin.cc
index f9ba7fb..d523d23 100644
--- a/bro-plugin-kafka/src/Plugin.cc
+++ b/bro-plugin-kafka/src/Plugin.cc
@@ -14,25 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #include "Plugin.h"
 #include "KafkaWriter.h"
 
-namespace plugin {
-namespace Metron_Kafka {
+namespace plugin { namespace Bro_Kafka {
     Plugin plugin;
-}
-}
+}}
 
-using namespace plugin::Metron_Kafka;
+using namespace plugin::Bro_Kafka;
 
 plugin::Configuration Plugin::Configure()
 {
-    AddComponent(new ::logging::Component(
-        "KafkaWriter", ::metron::kafka::KafkaWriter::Instantiate));
+    AddComponent(new ::logging::Component("KafkaWriter", ::logging::writer::KafkaWriter::Instantiate));
 
     plugin::Configuration config;
-    config.name = "Metron::Kafka";
+    config.name = "Bro::Kafka";
     config.description = "Writes logs to Kafka";
     config.version.major = 0;
     config.version.minor = 1;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/Plugin.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/Plugin.h b/bro-plugin-kafka/src/Plugin.h
index 4313a3c..8adeb18 100644
--- a/bro-plugin-kafka/src/Plugin.h
+++ b/bro-plugin-kafka/src/Plugin.h
@@ -14,14 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
-#ifndef BRO_PLUGIN_METRON_KAFKA
-#define BRO_PLUGIN_METRON_KAFKA
+
+#ifndef BRO_PLUGIN_BRO_KAFKA
+#define BRO_PLUGIN_BRO_KAFKA
 
 #include <plugin/Plugin.h>
 
-namespace plugin {
-namespace Metron_Kafka {
+namespace plugin { namespace Bro_Kafka {
 
     class Plugin : public ::plugin::Plugin {
     protected:
@@ -30,7 +29,6 @@ namespace Metron_Kafka {
     };
 
     extern Plugin plugin;
-}
-}
+}}
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/TaggedJSON.cc
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/TaggedJSON.cc b/bro-plugin-kafka/src/TaggedJSON.cc
new file mode 100644
index 0000000..db3f305
--- /dev/null
+++ b/bro-plugin-kafka/src/TaggedJSON.cc
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TaggedJSON.h"
+
+namespace threading { namespace formatter {
+
+TaggedJSON::TaggedJSON(string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn)
+{}
+
+TaggedJSON::~TaggedJSON()
+{}
+
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const
+{
+    desc->AddRaw("{");
+
+    // 'tag' the json; aka prepend the stream name to the json-formatted log content
+    desc->AddRaw("\"");
+    desc->AddRaw(stream_name);
+    desc->AddRaw("\": ");
+
+    // append the JSON formatted log record itself
+    JSON::Describe(desc, num_fields, fields, vals);
+
+    desc->AddRaw("}");
+    return true;
+}
+}}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/TaggedJSON.h
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/TaggedJSON.h b/bro-plugin-kafka/src/TaggedJSON.h
new file mode 100644
index 0000000..08a50df
--- /dev/null
+++ b/bro-plugin-kafka/src/TaggedJSON.h
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef BRO_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
+#define BRO_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
+
+#include <string>
+#include <threading/Formatter.h>
+#include <threading/formatters/JSON.h>
+
+using threading::formatter::JSON;
+using threading::MsgThread;
+using threading::Value;
+using threading::Field;
+
+namespace threading { namespace formatter {
+
+/*
+ * A JSON formatter that prepends or 'tags' the content with a log stream
+ * identifier.  For example,
+ *   { 'conn' : { ... }}
+ *   { 'http' : { ... }}
+ */
+class TaggedJSON : public JSON {
+
+public:
+    TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
+    virtual ~TaggedJSON();
+    virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const;
+
+private:
+    string stream_name;
+};
+
+}}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/kafka.bif
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/kafka.bif b/bro-plugin-kafka/src/kafka.bif
index a95937f..8a8070c 100644
--- a/bro-plugin-kafka/src/kafka.bif
+++ b/bro-plugin-kafka/src/kafka.bif
@@ -17,6 +17,7 @@
 
 module Kafka;
 
-const kafka_broker_list: string;
+const kafka_conf: config;
 const topic_name: string;
-const max_wait_on_delivery: count;
+const max_wait_on_shutdown: count;
+const tag_json: bool;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/src/kafka_const.bif
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/src/kafka_const.bif b/bro-plugin-kafka/src/kafka_const.bif
new file mode 100644
index 0000000..989c0ae
--- /dev/null
+++ b/bro-plugin-kafka/src/kafka_const.bif
@@ -0,0 +1,20 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+module Kafka;
+
+type config : table[string] of string;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/tests/Scripts/get-bro-env
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/tests/Scripts/get-bro-env b/bro-plugin-kafka/tests/Scripts/get-bro-env
index 80cfb11..8aa0ea7 100755
--- a/bro-plugin-kafka/tests/Scripts/get-bro-env
+++ b/bro-plugin-kafka/tests/Scripts/get-bro-env
@@ -15,10 +15,8 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-
-#
 # BTest helper for getting values for Bro-related environment variables.
-#
+
 base=`dirname $0`
 bro=`cat ${base}/../../build/CMakeCache.txt | grep BRO_DIST | cut -d = -f 2`
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/bro-plugin-kafka/tests/kafka/show-plugin.bro
----------------------------------------------------------------------
diff --git a/bro-plugin-kafka/tests/kafka/show-plugin.bro b/bro-plugin-kafka/tests/kafka/show-plugin.bro
index b305382..4e8dd6a 100644
--- a/bro-plugin-kafka/tests/kafka/show-plugin.bro
+++ b/bro-plugin-kafka/tests/kafka/show-plugin.bro
@@ -15,5 +15,5 @@
 #  limitations under the License.
 #
 
-# @TEST-EXEC: bro -NN Metron::Kafka >output
+# @TEST-EXEC: bro -NN Bro::Kafka >output
 # @TEST-EXEC: btest-diff output

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/.gitignore
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/.gitignore b/deployment/amazon-ec2/.gitignore
index e066c7b..38b03a9 100644
--- a/deployment/amazon-ec2/.gitignore
+++ b/deployment/amazon-ec2/.gitignore
@@ -1,2 +1,3 @@
 *.pem
 *.secret
+*.log

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/README.md
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/README.md b/deployment/amazon-ec2/README.md
index 63f0c89..c04f39c 100644
--- a/deployment/amazon-ec2/README.md
+++ b/deployment/amazon-ec2/README.md
@@ -43,11 +43,21 @@ Each of the provisioned hosts will be externally accessible from the internet at
 ssh centos@ec2-52-91-215-174.compute-1.amazonaws.com
 ```
 
-Multiple Environments
----------------------
+Usage
+-----
+
+### Multiple Environments
 
 This process can support provisioning of multiple, isolated environments.  Simply change the `env` settings in `conf/defaults.yml`.  For example, you might provision separate development, test, and production environments.
 
 ```
 env: metron-test
 ```
+
+### Selective Provisioning
+
+To provision only subsets of the entire Metron deployment, Ansible tags can be specified.  For example, to only deploy the sensors on an Amazon EC2 environment, run the following command.
+
+```
+ansible-playbook -i ec2.py playbook.yml --tags "ec2,sensors"
+```

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/ansible.cfg
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/ansible.cfg b/deployment/amazon-ec2/ansible.cfg
index e79770c..c8f26c4 100644
--- a/deployment/amazon-ec2/ansible.cfg
+++ b/deployment/amazon-ec2/ansible.cfg
@@ -21,6 +21,7 @@ roles_path = ../roles
 pipelining = True
 remote_user = centos
 forks = 20
+log_path = ./ansible.log
 
 # fix for "ssh throws 'unix domain socket too long' " problem
 [ssh_connection]

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/amazon-ec2/playbook.yml
----------------------------------------------------------------------
diff --git a/deployment/amazon-ec2/playbook.yml b/deployment/amazon-ec2/playbook.yml
index b269cfa..b7d9cf3 100644
--- a/deployment/amazon-ec2/playbook.yml
+++ b/deployment/amazon-ec2/playbook.yml
@@ -32,6 +32,8 @@
     - include: tasks/create-hosts.yml host_count=1 host_type=ambari_slave,enrichment,metron,ec2
     - include: tasks/create-hosts.yml host_count=3 host_type=search,metron,ec2
     - include: tasks/create-hosts.yml host_count=1 host_type=web,mysql,metron,ec2
+  tags:
+    - ec2
 
 #
 # mount additional data volumes on all ec2 hosts
@@ -42,6 +44,8 @@
     - include: tasks/mount-volume.yml vol_src=/dev/xvdb vol_mnt=/data1
     - include: tasks/mount-volume.yml vol_src=/dev/xvdc vol_mnt=/data2
     - include: tasks/expand-volume.yml volume=/dev/xvda
+  tags:
+    - ec2
 
 #
 # build the metron cluster

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/playbooks/ambari_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/ambari_install.yml b/deployment/playbooks/ambari_install.yml
index 68d067e..e6d226b 100644
--- a/deployment/playbooks/ambari_install.yml
+++ b/deployment/playbooks/ambari_install.yml
@@ -18,8 +18,9 @@
 - hosts: ec2
   become: true
   tasks:
-  - debug: msg="Detected EC2 - including defaults"
-  - include_vars: ../amazon-ec2/conf/defaults.yml
+    - include_vars: ../amazon-ec2/conf/defaults.yml
+  tags:
+    - ec2
 
 - hosts: ambari_*
   become: true

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/playbooks/metron_install.yml
----------------------------------------------------------------------
diff --git a/deployment/playbooks/metron_install.yml b/deployment/playbooks/metron_install.yml
index daa512a..5c4e085 100644
--- a/deployment/playbooks/metron_install.yml
+++ b/deployment/playbooks/metron_install.yml
@@ -18,9 +18,10 @@
 - hosts: ec2
   become: true
   tasks:
-  - debug: msg="Detected EC2 - including defaults"
-  - include_vars: ../amazon-ec2/conf/defaults.yml
-
+    - include_vars: ../amazon-ec2/conf/defaults.yml
+  tags:
+    - ec2
+    
 - hosts: metron
   become: true
   roles:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/roles/bro/tasks/bro-plugin-kafka.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/tasks/bro-plugin-kafka.yml b/deployment/roles/bro/tasks/bro-plugin-kafka.yml
index 8e953ba..435bc4e 100644
--- a/deployment/roles/bro/tasks/bro-plugin-kafka.yml
+++ b/deployment/roles/bro/tasks/bro-plugin-kafka.yml
@@ -15,7 +15,7 @@
 #  limitations under the License.
 #
 ---
-- name: Distribute bro plugin
+- name: Distribute bro-kafka plugin
   copy: src=../../../bro-plugin-kafka dest=/tmp mode=0755
 
 - name: Compile and install the plugin
@@ -24,16 +24,18 @@
     chdir: "/tmp/bro-plugin-kafka"
     creates: /usr/local/bro/lib/bro/plugins/METRON_KAFKA
   with_items:
+    - rm -rf build/
     - "./configure --bro-dist=/tmp/bro-{{ bro_version }}"
     - make
     - make install
 
-- name: Configure bro plugin
+- name: Configure bro-kafka plugin
   lineinfile:
     dest: /usr/local/bro/share/bro/site/local.bro
     line: "{{ item }}"
   with_items:
-    - "@load Metron/Kafka/logs-to-kafka.bro"
+    - "@load Bro/Kafka/logs-to-kafka.bro"
     - "redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);"
-    - "redef Kafka::kafka_broker_list = \"{{ kafka_broker_url }}\";"
     - "redef Kafka::topic_name = \"{{ bro_topic }}\";"
+    - "redef Kafka::tag_json = T;"
+    - "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] = \"{{ kafka_broker_url }}\");"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/roles/bro/tasks/librdkafka.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/tasks/librdkafka.yml b/deployment/roles/bro/tasks/librdkafka.yml
index 561bae2..925c18f 100644
--- a/deployment/roles/bro/tasks/librdkafka.yml
+++ b/deployment/roles/bro/tasks/librdkafka.yml
@@ -17,22 +17,23 @@
 ---
 - name: Download librdkafka
   get_url:
-    url: https://github.com/edenhill/librdkafka/archive/v0.9.0.tar.gz
-    dest: /tmp/librdkafka-0.9.0.tar.gz
+    url: "{{ librdkafka_url }}"
+    dest: "/tmp/librdkafka-{{ librdkafka_version }}.tar.gz"
 
 - name: Extract librdkafka tarball
   unarchive:
-    src: /tmp/librdkafka-0.9.0.tar.gz
+    src: "/tmp/librdkafka-{{ librdkafka_version }}.tar.gz"
     dest: /tmp
     copy: no
-    creates: /tmp/librdkafka-0.9.0
+    creates: "/tmp/librdkafka-{{ librdkafka_version }}"
 
 - name: Compile and install librdkafka
   shell: "{{ item }}"
   args:
-    chdir: /tmp/librdkafka-0.9.0
+    chdir: "/tmp/librdkafka-{{ librdkafka_version }}"
     creates: /usr/local/lib/librdkafka.so
   with_items:
+    - rm -rf build/
     - ./configure
     - make
     - make install

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/deployment/roles/bro/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/bro/vars/main.yml b/deployment/roles/bro/vars/main.yml
index 88e6f64..8141253 100644
--- a/deployment/roles/bro/vars/main.yml
+++ b/deployment/roles/bro/vars/main.yml
@@ -17,3 +17,5 @@
 ---
 bro_version: 2.4.1
 bro_topic: bro
+librdkafka_version: 0.8.6
+librdkafka_url: https://github.com/edenhill/librdkafka/archive/0.8.6.tar.gz

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/75666533/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d8ac9d1..d2949e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,32 +29,34 @@
 	<description>Performs release auditing for Metron.</description>
 	<url>https://metron.incubator.apache.org/</url>
 	<build>
-    <plugins>
+		<plugins>
 			<plugin>
 				<groupId>org.apache.rat</groupId>
 				<artifactId>apache-rat-plugin</artifactId>
 				<version>0.11</version>
 				<configuration>
-					<excludes>
-						<exclude>**/README.md</exclude>
-						<exclude>**/VERSION</exclude>
-						<exclude>**/*.json</exclude>
-						<exclude>**/*.log</exclude>
-						<exclude>**/*.ldif</exclude>
-						<exclude>**/*.template</exclude>
-						<exclude>**/.*</exclude>
-						<exclude>**/.*/**</exclude>
-						<exclude>**/*.seed</exclude>
-						<exclude>**/*.iml</exclude>
-						<exclude>**/ansible.cfg</exclude>
-						<exclude>site/**</exclude>
-						<exclude>metron-ui/lib/public/**</exclude>
-						<exclude>**/src/main/resources/patterns/**</exclude>
-						<exclude>**/src/test/resources/**</exclude>
-						<exclude>**/src/main/resources/Sample*/**</exclude>
-						<exclude>**/dependency-reduced-pom.xml</exclude>
-					        <exclude>**/files/opensoc-ui</exclude>
-					</excludes>
+				<excludes>
+					<exclude>**/README.md</exclude>
+					<exclude>**/VERSION</exclude>
+					<exclude>**/*.json</exclude>
+					<exclude>**/*.log</exclude>
+					<exclude>**/*.ldif</exclude>
+					<exclude>**/*.template</exclude>
+					<exclude>**/.*</exclude>
+					<exclude>**/.*/**</exclude>
+					<exclude>**/*.seed</exclude>
+					<exclude>**/*.iml</exclude>
+					<exclude>**/ansible.cfg</exclude>
+					<exclude>site/**</exclude>
+					<exclude>metron-ui/lib/public/**</exclude>
+					<exclude>**/src/main/resources/patterns/**</exclude>
+					<exclude>**/src/test/resources/**</exclude>
+					<exclude>**/src/main/resources/Sample*/**</exclude>
+					<exclude>**/dependency-reduced-pom.xml</exclude>
+				        <exclude>**/files/opensoc-ui</exclude>
+					<exclude>**/target/**</exclude>
+					<exclude>**/bro-plugin-kafka/build/**</exclude>
+				</excludes>
 				</configuration>
 			</plugin>
 		</plugins>