You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/04/28 20:24:02 UTC

incubator-metron git commit: METRON-836 Use Pycapa with Kerberos (nickwallen) closes apache/incubator-metron#524

Repository: incubator-metron
Updated Branches:
  refs/heads/master f36db22eb -> 47e5aa70a


METRON-836 Use Pycapa with Kerberos (nickwallen) closes apache/incubator-metron#524


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/47e5aa70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/47e5aa70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/47e5aa70

Branch: refs/heads/master
Commit: 47e5aa70adfd2fb000e412c3a6b6d06c91e2809c
Parents: f36db22
Author: nickwallen <ni...@nickallen.org>
Authored: Fri Apr 28 16:23:29 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Fri Apr 28 16:23:29 2017 -0400

----------------------------------------------------------------------
 metron-deployment/roles/pycapa/meta/main.yml |   1 +
 metron-sensors/README.md                     |   9 +-
 metron-sensors/pycapa/README.md              | 286 ++++++++++++++++++----
 metron-sensors/pycapa/pycapa/common.py       |   4 +
 metron-sensors/pycapa/pycapa/consumer.py     | 112 ++++++---
 metron-sensors/pycapa/pycapa/producer.py     |  97 ++++++--
 metron-sensors/pycapa/pycapa/pycapa_cli.py   | 113 +++++++--
 metron-sensors/pycapa/requirements.txt       |   2 +-
 8 files changed, 497 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-deployment/roles/pycapa/meta/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/pycapa/meta/main.yml b/metron-deployment/roles/pycapa/meta/main.yml
index c3d807b..a5b54b7 100644
--- a/metron-deployment/roles/pycapa/meta/main.yml
+++ b/metron-deployment/roles/pycapa/meta/main.yml
@@ -17,3 +17,4 @@
 ---
 dependencies:
   - ambari_gather_facts
+  - librdkafka

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/README.md b/metron-sensors/README.md
index af932e5..e458630 100644
--- a/metron-sensors/README.md
+++ b/metron-sensors/README.md
@@ -1,5 +1,8 @@
-# Metron Sensors
+Metron Sensors
+--------------
 
-- Fast CAPA
-- Py CAPA
+  * [`bro-plugin-kafka`](bro-plugin-kafka/): Provides integration between [Bro](https://www.bro.org/) and Kafka.  A Bro plugin that sends logging output to Kafka.  This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others to process the data generated by Bro.
 
+  * [`fastcapa`](fastcapa/): Performs fast network packet capture by leveraging Linux kernel-bypass and user space networking technology.  The probe will bind to a network interface, capture network packets, and send the raw packet data to Kafka.  This provides a scalable mechanism for ingesting high-volumes of network packet data.
+
+  * [`pycapa`](pycapa/): Performs lightweight network packet capture, retrieves network packets from Kafka, generates `libpcap`-compliant files, and enables integration with third-party tools like Wireshark.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/pycapa/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/README.md b/metron-sensors/pycapa/README.md
index 33e9d0a..1e8c2c5 100644
--- a/metron-sensors/pycapa/README.md
+++ b/metron-sensors/pycapa/README.md
@@ -1,74 +1,272 @@
-# Pycapa
+Pycapa
+------
 
-## Overview
+* [Overview](#overview)
+* [Installation](#installation)
+* [Usage](#usage)
+  * [Parameters](#parameters)
+  * [Examples](#examples)
+  * [Kerberos](#kerberos)
+* [FAQs](#faqs)
 
-Pycapa performs network packet capture, both off-the-wire and from Kafka, which is useful for the testing and development of [Apache Metron](https://github.com/apache/incubator-metron).  It is not intended for production use. The tool will capture packets from a specified interface and push them into a Kafka Topic.
+Overview
+========
 
-## Installation
+Pycapa performs network packet capture, both off-the-wire and from a Kafka topic, which is useful for the testing and development of [Apache Metron](https://github.com/apache/incubator-metron).  It is not intended for production use. The tool will capture packets from a specified interface and push them into a Kafka Topic.  The tool can also do the reverse.  It can consume packets from Kafka and reconstruct each network packet.  This can then be used to create a [libpcap-compliant file](https://wiki.wireshark.org/Development/LibpcapFileFormat) or even to feed directly into a tool like Wireshark to monitor ongoing activity.
 
-```
-pip install -r requirements.txt
-python setup.py install
-```
+Installation
+============
+
+General notes on the installation of Pycapa.
+* Python 2.7 is required.
+* The following package dependencies are required and can be installed automatically with `pip`.
+  * [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python)
+  * [pcapy](https://github.com/CoreSecurity/pcapy)
+* These instructions can be used directly on CentOS 7+.  
+* Other Linux distributions that come with Python 2.7 can use these instructions with some minor modifications.  
+* Older distributions, like CentOS 6, that come with Python 2.6 installed, should install Python 2.7 within a virtual environment and then run Pycapa from within the virtual environment.
+
+
+1. Install system dependencies including the core development tools, Python libraries and header files, and Libpcap libraries and header files.  On CentOS 7+, you can install these requirements with the following command.
+
+   ```
+   yum -y install "@Development tools" python-devel libpcap-devel
+   ```
+
+1. Install Librdkafka at your chosen $PREFIX.
+
+   ```
+   export PREFIX=/usr
+
+   wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz  -O - | tar -xz
+   cd librdkafka-0.9.4/
+   ./configure --prefix=$PREFIX
+   make
+   make install
+   ```
+
+1. Add Librdkafka to the dynamic library load path.
+
+    ```
+    echo "$PREFIX/lib" >> /etc/ld.so.conf.d/pycapa.conf
+    ldconfig -v
+    ```
+
+1. Install Pycapa.  This assumes that you already have the Metron source code on the host.
+
+    ```
+    cd incubator-metron/metron-sensors/pycapa
+    pip install -r requirements.txt
+    python setup.py install
+    ```
+
+Usage
+=====
+
+Pycapa has two primary runtime modes.
 
-## Usage
+* **Producer Mode**: Pycapa can capture packets from a network interface and forward those packets to a Kafka topic.  Pycapa embeds the raw network packet data in the Kafka message body.  The message key contains the timestamp indicating when the packet was captured in microseconds from the epoch, in network byte order.
+
+* **Consumer Mode**: Pycapa can also perform the reverse operation.  It can consume packets from Kafka and reconstruct each network packet.  This can then be used to create a [libpcap-compliant file](https://wiki.wireshark.org/Development/LibpcapFileFormat) or even to feed directly into a tool like Wireshark to monitor activity.
+
+### Parameters
 
 ```
-$ pycapa --help
-usage: pycapa [-h] [-p] [-c] [-k KAFKA_BROKERS] [-t TOPIC] [-n PACKET_COUNT]
-              [-d DEBUG] [-i INTERFACE]
+$ pycapa -h
+usage: pycapa [-h] [-p] [-c] [-k KAFKA_BROKERS] [-t KAFKA_TOPIC]
+              [-i NETWORK_IFACE] [-m MAX_PACKETS] [-pp PRETTY_PRINT]
+              [-ll LOG_LEVEL] [-X KAFKA_CONFIGS] [-s SNAPLEN]
 
 optional arguments:
   -h, --help            show this help message and exit
   -p, --producer        sniff packets and send to kafka
   -c, --consumer        read packets from kafka
-  -k KAFKA_BROKERS, --kafka KAFKA_BROKERS
+  -k KAFKA_BROKERS, --kafka-broker KAFKA_BROKERS
                         kafka broker(s)
-  -t TOPIC, --topic TOPIC
+  -t KAFKA_TOPIC, --kafka-topic KAFKA_TOPIC
                         kafka topic
-  -n PACKET_COUNT, --number PACKET_COUNT
-                        number of packets to consume
-  -d DEBUG, --debug DEBUG
-                        debug every X packets
-  -i INTERFACE, --interface INTERFACE
-                        interface to listen on
+  -i NETWORK_IFACE, --interface NETWORK_IFACE
+                        network interface to listen on
+  -m MAX_PACKETS, --max-packets MAX_PACKETS
+                        stop after this number of packets
+  -pp PRETTY_PRINT, --pretty-print PRETTY_PRINT
+                        pretty print every X packets
+  -ll LOG_LEVEL, --log-level LOG_LEVEL
+                        set the log level
+  -X KAFKA_CONFIGS      define a kafka client parameter; key=value
+  -s SNAPLEN, --snaplen SNAPLEN
+                        snapshot length
 ```
 
-Pycapa has two primary runtime modes.
+### Examples
 
-### Producer Mode
+**Example**: Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`.
+  ```
+  $ pycapa --producer \
+      --interface eth0 \
+      --kafka-broker localhost:9092 \
+      --kafka-topic pcap \
+      --max-packets 10
+  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'}
+  INFO:root:Starting packet capture
+  INFO:root:Waiting for '10' message(s) to flush
+  INFO:root:'10' packet(s) in, '10' packet(s) out
+  ```
 
-Pycapa can be configured to capture packets from a network interface and then forward those packets to a Kafka topic.  The following example will capture packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost`.
+**Example**: Capture packets until SIGINT is received.  A SIGINT is the interrupt signal sent when entering CTRL-D in the console.
+  ```
+  $ pycapa --producer \
+      --interface eth0 \
+      --kafka-broker localhost:9092 \
+      --kafka-topic pcap
+  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'}
+  INFO:root:Starting packet capture
+  ^C
+  INFO:root:Clean shutdown process started
+  INFO:root:Waiting for '0' message(s) to flush
+  INFO:root:'7' packet(s) in, '7' packet(s) out
+  ```
 
-```
-pycapa --producer --kafka localhost:9092 --topic pcap -i eth0
-```
+**Example**: While capturing packets, output diagnostic information every 10 packets.
+  ```
+  $ pycapa --producer \
+      --interface en0 \
+      --kafka-broker localhost:9092 \
+      --kafka-topic pcap \
+      --pretty-print 10
+  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'YMDSEEDIHVWD'}
+  INFO:root:Starting packet capture
+  10 packet(s) received
+  ac bc 32 bf 0d 43 b8 3e 59 8b 8a 8a 08 00 45 00
+  00 3c 00 00 40 00 40 06 b9 66 c0 a8 00 02 c0 a8
+  00 03 1f 7c d7 14 5f 8b 82 b4 a8 c5 f6 63 a0 12
+  38 90 59 cc 00 00 02 04 05 b4 04 02 08 0a 00 51
+  44 17 39 43 3e 9b 01 03 03 04
+  20 packet(s) received
+  01 00 5e 00 00 fb ac bc 32 bf 0d 43 08 00 45 00
+  00 44 d2 09 00 00 ff 11 47 f8 c0 a8 00 03 e0 00
+  00 fb 14 e9 14 e9 00 30 69 fc 00 00 00 00 00 01
+  00 00 00 00 00 00 0b 5f 67 6f 6f 67 6c 65 63 61
+  73 74 04 5f 74 63 70 05 6c 6f 63 61 6c 00 00 0c
+  80 01
+  ^C
+  INFO:root:Clean shutdown process started
+  INFO:root:Waiting for '2' message(s) to flush
+  INFO:root:'20' packet(s) in, '20' packet(s) out
+  ```
 
-To output debug messages every 100 captured packets, run the following.
+**Example**: Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI.
+  ```
+  $ pycapa --consumer \
+      --kafka-broker localhost:9092 \
+      --kafka-topic pcap \
+      --max-packets 10 \
+      | tshark -i -
+  Capturing on 'Standard input'
+      1   0.000000 ArrisGro_0e:65:df \u2192 Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1
+      2   0.000044 Apple_bf:0d:43 \u2192 ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43
+      3   0.203495 fe80::1286:8cff:fe0e:65df \u2192 ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+      4   2.031988  192.168.0.3 \u2192 96.27.183.249 TCP 54 55110 \u2192 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0
+      5   2.035816 192.30.253.125 \u2192 192.168.0.3  TLSv1.2 97 Application Data
+      6   2.035892  192.168.0.3 \u2192 192.30.253.125 TCP 66 54671 \u2192 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052
+      7   2.035994  192.168.0.3 \u2192 192.30.253.125 TLSv1.2 101 Application Data
+      8   2.053866 96.27.183.249 \u2192 192.168.0.3  TCP 66 [TCP ACKed unseen segment] 443 \u2192 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381
+      9   2.083872 192.30.253.125 \u2192 192.168.0.3  TCP 66 443 \u2192 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495
+     10   3.173189 fe80::1286:8cff:fe0e:65df \u2192 ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+  10 packets captured
+  ```
 
-```
-pycapa --producer --kafka localhost:9092 --topic pcap -i eth0 --debug 100
-```
+**Example**: Consume 10 packets and create a libpcap-compliant pcap file.
+  ```
+  $ pycapa --consumer \
+      --kafka-broker localhost:9092 \
+      --kafka-topic pcap \
+      --max-packets 10 \
+      > out.pcap
+  $ tshark -r out.pcap
+      1   0.000000 199.193.204.147 \u2192 192.168.0.3  TLSv1.2 151 Application Data
+      2   0.000005 199.193.204.147 \u2192 192.168.0.3  TLSv1.2 1191 Application Data
+      3   0.000088  192.168.0.3 \u2192 199.193.204.147 TCP 66 54788 \u2192 443 [ACK] Seq=1 Ack=86 Win=4093 Len=0 TSval=961284465 TSecr=943744612
+      4   0.000089  192.168.0.3 \u2192 199.193.204.147 TCP 66 54788 \u2192 443 [ACK] Seq=1 Ack=1211 Win=4058 Len=0 TSval=961284465 TSecr=943744612
+      5   0.948788  192.168.0.3 \u2192 192.30.253.125 TCP 54 54671 \u2192 443 [ACK] Seq=1 Ack=1 Win=4096 Len=0
+      6   1.005175 192.30.253.125 \u2192 192.168.0.3  TCP 66 [TCP ACKed unseen segment] 443 \u2192 54671 [ACK] Seq=1 Ack=2 Win=31 Len=0 TSval=2658544467 TSecr=961240339
+      7   1.636312 fe80::1286:8cff:fe0e:65df \u2192 ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+      8   2.253052 192.175.27.112 \u2192 192.168.0.3  TLSv1.2 928 Application Data
+      9   2.253140  192.168.0.3 \u2192 192.175.27.112 TCP 66 55078 \u2192 443 [ACK] Seq=1 Ack=863 Win=4069 Len=0 TSval=961286699 TSecr=967172238
+     10   2.494769  192.168.0.3 \u2192 224.0.0.251  MDNS 82 Standard query 0x0000 PTR _googlecast._tcp.local, "QM" question
+  ```
 
-### Consumer Mode
+### Kerberos
 
-Pycapa can be configured to consume packets from a Kafka topic and then write those packets to a [libpcap-compliant file](https://wiki.wireshark.org/Development/LibpcapFileFormat).  To read 100 packets from a kafka topic and then write those to a [libpcap-compliant file](https://wiki.wireshark.org/Development/LibpcapFileFormat), run the following command.  The file `out.pcap` can then be opened with a tool such as Wireshark for further validation.
+The probe can be used in a Kerberized environment.  Follow these additional steps to use Pycapa with Kerberos.  The following assumptions have been made.  These may need altered to fit your environment.
 
-```
-pycapa --consumer --kafka localhost:9092 --topic pcap --n 100 > out.pcap
-```
+  * The Kafka broker is at `kafka1:6667`
+  * Zookeeper is at `zookeeper1:2181`
+  * The Kafka security protocol is `SASL_PLAINTEXT`
+  * The keytab used is located at `/etc/security/keytabs/metron.headless.keytab`
+  * The service principal is `metron@EXAMPLE.COM`
 
-To consume packets from Kafka continuously and print debug messages every 10 packets, run the following command.  
+1. Build Librdkafka with SASL support (` --enable-sasl`) and install at your chosen $PREFIX.
+    ```
+    wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz  -O - | tar -xz
+    cd librdkafka-0.9.4/
+    ./configure --prefix=$PREFIX --enable-sasl
+    make
+    make install
+    ```
 
-```
-pycapa --consumer --kafka localhost:9092 --topic pcap --debug 10
-```
+1. Validate Librdkafka does indeed support SASL.  Run the following command and ensure that `sasl` is returned as a built-in feature.
+    ```
+    $ examples/rdkafka_example -X builtin.features
+    builtin.features = gzip,snappy,ssl,sasl,regex
+    ```
+
+   If it is not, ensure that you have `libsasl` or `libsasl2` installed.  On CentOS, this can be installed with the following command.
+    ```
+    yum install -y cyrus-sasl cyrus-sasl-devel cyrus-sasl-gssapi
+    ```
 
-## Dependencies
+1. Grant access to your Kafka topic.  In this example the topic is simply named `pcap`.
+    ```
+    ${KAFKA_HOME}/bin/kafka-acls.sh \
+      --authorizer kafka.security.auth.SimpleAclAuthorizer \
+      --authorizer-properties zookeeper.connect=zookeeper1:2181 \
+      --add \
+      --allow-principal User:metron \
+      --topic pcap
 
-* [kafka-python](https://github.com/dpkp/kafka-python)
-* [pcapy](https://github.com/CoreSecurity/pcapy)
+    ${KAFKA_HOME}/bin/kafka-acls.sh \
+      --authorizer kafka.security.auth.SimpleAclAuthorizer \
+      --authorizer-properties zookeeper.connect=zookeeper1:2181 \
+      --add \
+      --allow-principal User:metron \
+      --group pycapa
+    ```
 
-## Implementation
+1. Use Pycapa as you normally would, but append the following three additional parameters
+  * `security.protocol`
+  * `sasl.kerberos.keytab`
+  * `sasl.kerberos.principal`
+  ```
+  $ pycapa --producer \
+      --interface eth0 \
+      --kafka-broker kafka1:6667 \
+      --kafka-topic pcap --max-packets 10 \
+      -X security.protocol=SASL_PLAINTEXT \
+      -X sasl.kerberos.keytab=/etc/security/keytabs/metron.headless.keytab \
+      -X sasl.kerberos.principal=metron-metron@METRONEXAMPLE.COM
+  INFO:root:Connecting to Kafka; {'sasl.kerberos.principal': 'metron-metron@METRONEXAMPLE.COM', 'group.id': 'ORNLVWJZZUAA', 'security.protocol': 'SASL_PLAINTEXT', 'sasl.kerberos.keytab': '/etc/security/keytabs/metron.headless.keytab', 'bootstrap.servers': 'kafka1:6667'}
+  INFO:root:Starting packet capture
+  INFO:root:Waiting for '1' message(s) to flush
+  INFO:root:'10' packet(s) in, '10' packet(s) out
+  ```
 
-When run in Producer Mode, Pycapa embeds the raw network packet data in the Kafka message.  The message key contains the timestamp indicating when the packet was captured in microseconds from the epoch.  This value is in network byte order.
+FAQs
+====
+
+**Question**: How do I get more logs?
+
+Use the following two command-line arguments to get detailed logging.
+```
+-X debug=all --log-level DEBUG
+```

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/pycapa/pycapa/common.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/common.py b/metron-sensors/pycapa/pycapa/common.py
index 3fb78fd..535f756 100644
--- a/metron-sensors/pycapa/pycapa/common.py
+++ b/metron-sensors/pycapa/pycapa/common.py
@@ -19,18 +19,22 @@ import struct
 
 
 def to_hex(s):
+    """ Transforms a string to hexadecimal notation. """
     hex_str = ' '.join("{0:02x}".format(ord(c)) for c in s)
     return '\n'.join([hex_str[i:i+48] for i in range(0, len(hex_str), 48)])
 
 
 def to_date(epoch_micros):
+    """ Transforms a timestamp in epoch microseconds to a more legible format. """
     epoch_secs = epoch_micros / 1000000.0
     return datetime.fromtimestamp(epoch_secs).strftime('%Y-%m-%d %H:%M:%S.%f')
 
 
 def pack_ts(ts):
+    """ Packs a timestamp into a binary form. """
     return struct.pack(">Q", ts)
 
 
 def unpack_ts(packed_ts):
+    """ Unpacks a timestamp from a binary form. """
     return struct.unpack_from(">Q", bytes(packed_ts), 0)[0]

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/pycapa/pycapa/consumer.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/consumer.py b/metron-sensors/pycapa/pycapa/consumer.py
index 950dc83..7029f25 100644
--- a/metron-sensors/pycapa/pycapa/consumer.py
+++ b/metron-sensors/pycapa/pycapa/consumer.py
@@ -15,50 +15,98 @@
 #  limitations under the License.
 #
 import sys
+import threading
+import signal
+import pcapy
+import argparse
+import random
+import logging
 import time
-import kafka
 import struct
+from confluent_kafka import Consumer, KafkaException, KafkaError
 from common import to_date, to_hex, unpack_ts
 
 
-def global_header(magic=0xa1b2c3d4L, version_major=2, version_minor=4, zone=0,
-                  sigfigs=0, snaplen=65535, network=1):
+finished = threading.Event()
+
+
+def signal_handler(signum, frame):
+    """ Initiates a clean shutdown for a SIGINT """
+
+    finished.set()
+    logging.debug("Clean shutdown process started")
+
+
+def global_header(args, magic=0xa1b2c3d4L, version_major=2, version_minor=4, zone=0,
+                  sigfigs=0, network=1):
+    """ Returns the global header used in libpcap-compliant file. """
+
     return struct.pack("IHHIIII", magic, version_major, version_minor, zone,
-                       sigfigs, snaplen, network)
+        sigfigs, args.snaplen, network)
+
 
+def packet_header(msg):
+    """ Returns the packet header used in a libpcap-compliant file. """
 
-def packet_header(pkt_raw, msg_key):
-    epoch_micros = struct.unpack_from(">Q", bytes(msg_key), 0)[0]
+    epoch_micros = struct.unpack_from(">Q", bytes(msg.key()), 0)[0]
     secs = epoch_micros / 1000000
     usec = epoch_micros % 1000000
-    caplen = wirelen = len(pkt_raw)
+    caplen = wirelen = len(msg.value())
     hdr = struct.pack('IIII', secs, usec, caplen, wirelen)
     return hdr
 
 
-def consumer(args):
+def consumer(args, poll_timeout=3.0):
+    """ Consumes packets from a Kafka topic. """
+
+    # setup the signal handler
+    signal.signal(signal.SIGINT, signal_handler)
+
     # connect to kafka
-    brokers = args.kafka_brokers.split(",")
-    kafka_consumer = kafka.KafkaConsumer(args.topic, bootstrap_servers=brokers)
-
-    # if debug not set, write libpcap global header
-    if args.debug == 0:
-        sys.stdout.write(global_header())
-
-    # start packet capture
-    packet_count = 0
-    for msg in kafka_consumer:
-
-        # if debug not set, write the packet header and packet
-        if args.debug == 0:
-            sys.stdout.write(packet_header(msg.value, msg.key))
-            sys.stdout.write(msg.value)
-
-        elif packet_count % args.debug == 0:
-            print 'Packet: count=%s dt=%s topic=%s' % (
-                packet_count, to_date(unpack_ts(msg.key)), args.topic)
-            print to_hex(msg.value)
-
-        packet_count += 1
-        if args.packet_count > 0 and packet_count >= args.packet_count:
-            break
+    logging.debug("Connecting to Kafka; %s", args.kafka_configs)
+    kafka_consumer = Consumer(args.kafka_configs)
+    kafka_consumer.subscribe([args.kafka_topic])
+
+    # if 'pretty-print' not set, write libpcap global header
+    if args.pretty_print == 0:
+        sys.stdout.write(global_header(args))
+        sys.stdout.flush()
+
+    try:
+        pkts_in = 0
+        while not finished.is_set() and (args.max_packets <= 0 or pkts_in < args.max_packets):
+
+            # consume a message from kafka
+            msg = kafka_consumer.poll(timeout=poll_timeout)
+            if msg is None:
+                # no message received
+                continue;
+
+            elif msg.error():
+
+                if msg.error().code() == KafkaError._PARTITION_EOF:
+                    logging.debug("reached end of topar: topic=%s, partition=%d, offset=%s", msg.topic(), msg.partition(), msg.offset())
+                elif msg.error():
+                    raise KafkaException(msg.error())
+
+            else:
+                pkts_in += 1
+                logging.debug("Packet received: pkts_in=%d", pkts_in)
+
+                if args.pretty_print == 0:
+
+                    # write the packet header and packet
+                    sys.stdout.write(packet_header(msg))
+                    sys.stdout.write(msg.value())
+                    sys.stdout.flush()
+
+                elif pkts_in % args.pretty_print == 0:
+
+                    # pretty print
+                    print 'Packet: count=%s date=%s topic=%s' % (
+                        pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic)
+                    print to_hex(msg.value())
+
+    finally:
+        sys.stdout.close()
+        kafka_consumer.close()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/pycapa/pycapa/producer.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/producer.py b/metron-sensors/pycapa/pycapa/producer.py
index 8910b9b..7374522 100644
--- a/metron-sensors/pycapa/pycapa/producer.py
+++ b/metron-sensors/pycapa/pycapa/producer.py
@@ -14,45 +14,96 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
+import sys
+import threading
+import signal
 import pcapy
 import argparse
-import kafka
 import random
+import logging
 from common import to_date, to_hex, pack_ts
+from confluent_kafka import Producer
+
+finished = threading.Event()
+
+
+def signal_handler(signum, frame):
+    """ Initiates a clean shutdown for a SIGINT """
+
+    finished.set()
+    logging.info("Clean shutdown process started")
 
 
 def partitioner(key_bytes, all_parts, avail_parts):
+    """ Partitions messages randomly across all available partitions. """
+
     return random.choice(avail_parts)
 
 
 def timestamp(pkt_hdr):
+    """ Returns the timestamp of the packet in epoch milliseconds. """
+
     (epoch_secs, delta_micros) = pkt_hdr.getts()
     epoch_micros = (epoch_secs * 1000000.0) + delta_micros
     return epoch_micros
 
 
-def producer(args):
+def delivery_callback(err, msg):
+    """ Callback executed when message delivery either succeeds or fails. """
+
+    # initialize counter, if needed
+    if not hasattr(delivery_callback, "pkts_out"):
+         delivery_callback.pkts_out = 0
+
+    if err:
+        logging.error("message delivery failed: error=%s", err)
+    else:
+        logging.debug("message delivery succeeded: pkts_out=%d", delivery_callback.pkts_out)
+        delivery_callback.pkts_out += 1
+
+
+def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
+    """ Captures packets from a network interface and sends them to a Kafka topic. """
+
+    # setup the signal handler
+    signal.signal(signal.SIGINT, signal_handler)
+
     # connect to kafka
-    producer = kafka.KafkaProducer(
-        bootstrap_servers=args.kafka_brokers.split(","),
-        partitioner=partitioner)
+    logging.info("Connecting to Kafka; %s", args.kafka_configs)
+    kafka_producer = Producer(args.kafka_configs)
 
     # initialize packet capture
-    capture = pcapy.open_live(args.interface, 65535, True, 3000)
-    packet_count = 0
-
-    # start packet capture
-    while True:
-        (pkt_hdr, pkt_raw) = capture.next()
-        if pkt_hdr is not None:
-
-            # send packet to kafka
-            pkt_ts = timestamp(pkt_hdr)
-            producer.send(args.topic, key=pack_ts(pkt_ts), value=pkt_raw)
-
-            # debug messages, if needed
-            packet_count += 1
-            if args.debug > 0 and packet_count % args.debug == 0:
-                print 'Sent Packet: count=%s dt=%s topic=%s' % (
-                    packet_count, to_date(pkt_ts), args.topic)
-                print to_hex(pkt_raw)
+    logging.info("Starting packet capture")
+    capture = pcapy.open_live(args.interface, args.snaplen, sniff_promisc, sniff_timeout_ms)
+    pkts_in = 0
+
+    try:
+        while not finished.is_set() and (args.max_packets <= 0 or pkts_in < args.max_packets):
+
+            # capture a packet
+            (pkt_hdr, pkt_raw) = capture.next()
+            if pkt_hdr is not None:
+                logging.debug("Packet received: pkts_in=%d, pkt_len=%s", pkts_in, pkt_hdr.getlen())
+                pkts_in += 1
+                pkt_ts = timestamp(pkt_hdr)
+                kafka_producer.produce(args.kafka_topic, key=pack_ts(pkt_ts), value=pkt_raw, callback=delivery_callback)
+
+                # debug messages, if needed
+                if args.pretty_print > 0 and pkts_in % args.pretty_print == 0:
+                    print '{} packet(s) received'.format(pkts_in)
+                    print to_hex(pkt_raw)
+
+            # serve the callback queue
+            kafka_producer.poll(0)
+
+    finally:
+        # flush all messages
+        logging.info("Waiting for '%d' message(s) to flush", len(kafka_producer))
+        kafka_producer.flush()
+
+        # pkts_out may not be initialized if the callback was never executed
+        pkts_out = 0
+        if hasattr(delivery_callback, "pkts_out"):
+            pkts_out = delivery_callback.pkts_out
+
+        logging.info("'%d' packet(s) in, '%d' packet(s) out", pkts_in, pkts_out)

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/pycapa/pycapa/pycapa_cli.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/pycapa_cli.py b/metron-sensors/pycapa/pycapa/pycapa_cli.py
index 33b2df6..f650280 100644
--- a/metron-sensors/pycapa/pycapa/pycapa_cli.py
+++ b/metron-sensors/pycapa/pycapa/pycapa_cli.py
@@ -15,63 +15,127 @@
 #  limitations under the License.
 #
 import argparse
+import logging
+import random
+import string
 from producer import producer
 from consumer import consumer
 
 
 def make_parser():
+    """ Creates a command-line argument parser. """
+
     parser = argparse.ArgumentParser()
-    parser.add_argument('-p',
-                        '--producer',
+    parser.add_argument('-p', '--producer',
                         help='sniff packets and send to kafka',
                         dest='producer',
                         action='store_true',
                         default=False)
-    parser.add_argument('-c',
-                        '--consumer',
+
+    parser.add_argument('-c', '--consumer',
                         help='read packets from kafka',
                         dest='consumer',
                         action='store_true',
                         default=False)
-    parser.add_argument('-k',
-                        '--kafka',
+
+    parser.add_argument('-k', '--kafka-broker',
                         help='kafka broker(s)',
                         dest='kafka_brokers')
-    parser.add_argument('-t',
-                        '--topic',
+
+    parser.add_argument('-t', '--kafka-topic',
                         help='kafka topic',
-                        dest='topic')
-    parser.add_argument('-n',
-                        '--number',
-                        help='number of packets to consume',
-                        dest='packet_count',
-                        type=int)
-    parser.add_argument('-d',
-                        '--debug',
-                        help='debug every X packets',
-                        dest='debug',
+                        dest='kafka_topic')
+
+    parser.add_argument('-i', '--interface',
+                        help='network interface to listen on',
+                        dest='interface',
+                        metavar='NETWORK_IFACE')
+
+    parser.add_argument('-m', '--max-packets',
+                        help='stop after this number of packets',
+                        dest='max_packets',
                         type=int,
                         default=0)
-    parser.add_argument('-i',
-                        '--interface',
-                        help='interface to listen on',
-                        dest='interface')
+
+    parser.add_argument('-pp','--pretty-print',
+                        help='pretty print every X packets',
+                        dest='pretty_print',
+                        type=int,
+                        default=0)
+
+    parser.add_argument('-ll', '--log-level',
+                        help='set the log level',
+                        dest='log_level',
+                        default='INFO')
+
+    parser.add_argument('-X',
+                        type=keyval,
+                        help='define a kafka client parameter; key=value',
+                        dest='kafka_configs',
+                        action='append')
+
+    parser.add_argument('-s','--snaplen',
+                        help="snapshot length",
+                        dest='snaplen',
+                        type=int,
+                        default=65535)
+
     return parser
 
 
+def keyval(input, delim="="):
+    """ Expects a single key=value. """
+
+    keyval = input.split("=")
+    if(len(keyval) != 2):
+        raise ValueError("expect key=val")
+
+    return keyval
+
+
 def valid_args(args):
-    if args.producer and args.kafka_brokers and args.topic and args.interface:
+    """ Validates the command-line arguments. """
+
+    if args.producer and args.kafka_brokers and args.kafka_topic and args.interface:
         return True
-    elif args.consumer and args.kafka_brokers and args.topic:
+    elif args.consumer and args.kafka_brokers and args.kafka_topic:
         return True
     else:
         return False
 
 
+def clean_kafka_configs(args):
+    """ Cleans and transforms the Kafka client configs. """
+
+    # transform 'kafka_configs' args from "list of lists" to dict
+    configs = {}
+    if(args.kafka_configs is not None):
+        for keyval in args.kafka_configs:
+            configs[keyval[0]] = keyval[1:][0]
+
+    # boostrap servers can be set as a "-X bootstrap.servers=KAFKA:9092" or "-k KAFKA:9092"
+    bootstrap_key = "bootstrap.servers"
+    if(bootstrap_key not in configs):
+        configs[bootstrap_key] = args.kafka_brokers;
+
+    # if no 'group.id', generate a random one
+    group_key = "group.id"
+    if(group_key not in configs):
+        configs[group_key] = ''.join(random.choice(string.ascii_uppercase) for _ in range(12))
+
+    args.kafka_configs = configs
+
 def main():
     parser = make_parser()
     args = parser.parse_args()
 
+    # setup logging
+    numeric_log_level = getattr(logging, args.log_level.upper(), None)
+    if not isinstance(numeric_log_level, int):
+        raise ValueError('invalid log level: %s' % args.log_level)
+    logging.basicConfig(level=numeric_log_level)
+
+    clean_kafka_configs(args)
     if not valid_args(args):
         parser.print_help()
     elif args.consumer:
@@ -81,5 +145,6 @@ def main():
     else:
         parser.print_help()
 
+
 if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/47e5aa70/metron-sensors/pycapa/requirements.txt
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/requirements.txt b/metron-sensors/pycapa/requirements.txt
index b2dcb05..0119daa 100644
--- a/metron-sensors/pycapa/requirements.txt
+++ b/metron-sensors/pycapa/requirements.txt
@@ -14,6 +14,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-kafka-python
+confluent_kafka
 pcapy
 argparse