You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ma...@apache.org on 2017/06/01 21:42:02 UTC
[43/44] metron git commit: METRON-937 Pycapa - Consume Messages from
Begin, End, or Stored Offsets (nickwallen) closes apache/metron#570
METRON-937 Pycapa - Consume Messages from Begin, End, or Stored Offsets (nickwallen) closes apache/metron#570
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8779eb3f
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8779eb3f
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8779eb3f
Branch: refs/heads/Metron_0.4.0
Commit: 8779eb3fe6d2767d6b79d665adec735380cf2d61
Parents: 74bc236
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Jun 1 11:15:42 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Jun 1 11:15:42 2017 -0400
----------------------------------------------------------------------
metron-sensors/pycapa/README.md | 169 +++++++++++++-----------
metron-sensors/pycapa/pycapa/consumer.py | 51 ++++++-
metron-sensors/pycapa/pycapa/producer.py | 24 +++-
metron-sensors/pycapa/pycapa/pycapa_cli.py | 34 +++--
4 files changed, 179 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/README.md b/metron-sensors/pycapa/README.md
index 6f41d63..fed1399 100644
--- a/metron-sensors/pycapa/README.md
+++ b/metron-sensors/pycapa/README.md
@@ -72,19 +72,22 @@ Pycapa has two primary runtime modes.
### Parameters
```
-$ pycapa -h
+$ pycapa --help
usage: pycapa [-h] [-p] [-c] [-k KAFKA_BROKERS] [-t KAFKA_TOPIC]
- [-i NETWORK_IFACE] [-m MAX_PACKETS] [-pp PRETTY_PRINT]
- [-ll LOG_LEVEL] [-X KAFKA_CONFIGS] [-s SNAPLEN]
+ [-o {begin,end,stored}] [-i NETWORK_IFACE] [-m MAX_PACKETS]
+ [-pp PRETTY_PRINT] [-ll LOG_LEVEL] [-X KAFKA_CONFIGS]
+ [-s SNAPLEN]
optional arguments:
-h, --help show this help message and exit
-p, --producer sniff packets and send to kafka
-c, --consumer read packets from kafka
-k KAFKA_BROKERS, --kafka-broker KAFKA_BROKERS
- kafka broker(s)
+ kafka broker(s) as host:port
-t KAFKA_TOPIC, --kafka-topic KAFKA_TOPIC
kafka topic
+ -o {begin,end,stored}, --kafka-offset {begin,end,stored}
+ kafka offset to consume from; default=end
-i NETWORK_IFACE, --interface NETWORK_IFACE
network interface to listen on
-m MAX_PACKETS, --max-packets MAX_PACKETS
@@ -92,91 +95,81 @@ optional arguments:
-pp PRETTY_PRINT, --pretty-print PRETTY_PRINT
pretty print every X packets
-ll LOG_LEVEL, --log-level LOG_LEVEL
- set the log level
+ set the log level; DEBUG, INFO, WARN
-X KAFKA_CONFIGS define a kafka client parameter; key=value
-s SNAPLEN, --snaplen SNAPLEN
- snapshot length
+ capture only the first X bytes of each packet;
+ default=65535
```
### Examples
-**Example**: Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`.
- ```
- $ pycapa --producer \
- --interface eth0 \
- --kafka-broker localhost:9092 \
- --kafka-topic pcap \
- --max-packets 10
- INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'}
- INFO:root:Starting packet capture
- INFO:root:Waiting for '10' message(s) to flush
- INFO:root:'10' packet(s) in, '10' packet(s) out
- ```
+#### Example 1
-**Example**: Capture packets until SIGINT is received. A SIGINT is the interrupt signal sent when entering CTRL-D in the console.
- ```
- $ pycapa --producer \
- --interface eth0 \
- --kafka-broker localhost:9092 \
- --kafka-topic pcap
- INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'}
- INFO:root:Starting packet capture
- ^C
- INFO:root:Clean shutdown process started
- INFO:root:Waiting for '0' message(s) to flush
- INFO:root:'7' packet(s) in, '7' packet(s) out
- ```
+Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`. The process will not terminate until all messages have been delivered to Kafka.
-**Example**: While capturing packets, output diagnostic information every 10 packets.
- ```
- $ pycapa --producer \
- --interface en0 \
- --kafka-broker localhost:9092 \
- --kafka-topic pcap \
- --pretty-print 10
- INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'YMDSEEDIHVWD'}
+```
+$ pycapa --producer \
+ --interface eth0 \
+ --kafka-broker localhost:9092 \
+ --kafka-topic pcap \
+ --max-packets 10
+INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'}
+INFO:root:Starting packet capture
+INFO:root:Waiting for '6' message(s) to flush
+INFO:root:'10' packet(s) in, '10' packet(s) out
+```
+
+#### Example 2
+
+Capture packets until SIGINT is received (the interrupt signal sent when entering CTRL-C in the console.) In this example, nothing will be reported as packets are captured and delivered to Kafka. Simply wait a few seconds, then type CTRL-C and the number of packets will be reported.
+
+```
+$ pycapa --producer \
+ --interface en0 \
+ --kafka-broker localhost:9092 \
+ --kafka-topic pcap
+INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'}
+INFO:root:Starting packet capture
+^C
+INFO:root:Clean shutdown process started
+INFO:root:Waiting for '2' message(s) to flush
+INFO:root:'21' packet(s) in, '21' packet(s) out
+```
+
+#### Example 3
+
+While capturing packets, output diagnostic information every 5 packets. Diagnostics will report when packets have been received from the network interface and when they have been successfully delivered to Kafka.
+
+```
+$ pycapa --producer \
+ --interface eth0 \
+ --kafka-broker localhost:9092 \
+ --kafka-topic pcap \
+ --pretty-print 5
+ INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'UAWINMBDNQEH'}
INFO:root:Starting packet capture
- 10 packet(s) received
- ac bc 32 bf 0d 43 b8 3e 59 8b 8a 8a 08 00 45 00
- 00 3c 00 00 40 00 40 06 b9 66 c0 a8 00 02 c0 a8
- 00 03 1f 7c d7 14 5f 8b 82 b4 a8 c5 f6 63 a0 12
- 38 90 59 cc 00 00 02 04 05 b4 04 02 08 0a 00 51
- 44 17 39 43 3e 9b 01 03 03 04
- 20 packet(s) received
- 01 00 5e 00 00 fb ac bc 32 bf 0d 43 08 00 45 00
- 00 44 d2 09 00 00 ff 11 47 f8 c0 a8 00 03 e0 00
- 00 fb 14 e9 14 e9 00 30 69 fc 00 00 00 00 00 01
- 00 00 00 00 00 00 0b 5f 67 6f 6f 67 6c 65 63 61
- 73 74 04 5f 74 63 70 05 6c 6f 63 61 6c 00 00 0c
- 80 01
+ Packet received[5]
+ Packet delivered[5]: date=2017-05-08 14:48:54.474031 topic=pcap partition=0 offset=29086 len=42
+ Packet received[10]
+ Packet received[15]
+ Packet delivered[10]: date=2017-05-08 14:48:58.879710 topic=pcap partition=0 offset=0 len=187
+ Packet delivered[15]: date=2017-05-08 14:48:59.633127 topic=pcap partition=0 offset=0 len=43
+ Packet received[20]
+ Packet delivered[20]: date=2017-05-08 14:49:01.949628 topic=pcap partition=0 offset=29101 len=134
+ Packet received[25]
^C
INFO:root:Clean shutdown process started
- INFO:root:Waiting for '2' message(s) to flush
- INFO:root:'20' packet(s) in, '20' packet(s) out
- ```
+ Packet delivered[25]: date=2017-05-08 14:49:03.589940 topic=pcap partition=0 offset=0 len=142
+ INFO:root:Waiting for '1' message(s) to flush
+ INFO:root:'27' packet(s) in, '27' packet(s) out
-**Example**: Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI.
- ```
- $ pycapa --consumer \
- --kafka-broker localhost:9092 \
- --kafka-topic pcap \
- --max-packets 10 \
- | tshark -i -
- Capturing on 'Standard input'
- 1 0.000000 ArrisGro_0e:65:df → Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1
- 2 0.000044 Apple_bf:0d:43 → ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43
- 3 0.203495 fe80::1286:8cff:fe0e:65df → ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
- 4 2.031988 192.168.0.3 → 96.27.183.249 TCP 54 55110 → 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0
- 5 2.035816 192.30.253.125 → 192.168.0.3 TLSv1.2 97 Application Data
- 6 2.035892 192.168.0.3 → 192.30.253.125 TCP 66 54671 → 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052
- 7 2.035994 192.168.0.3 → 192.30.253.125 TLSv1.2 101 Application Data
- 8 2.053866 96.27.183.249 → 192.168.0.3 TCP 66 [TCP ACKed unseen segment] 443 → 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381
- 9 2.083872 192.30.253.125 → 192.168.0.3 TCP 66 443 → 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495
- 10 3.173189 fe80::1286:8cff:fe0e:65df → ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
- 10 packets captured
- ```
+```
+
+#### Example 4
+
+Consume 10 packets and create a libpcap-compliant pcap file.
-**Example**: Consume 10 packets and create a libpcap-compliant pcap file.
```
$ pycapa --consumer \
--kafka-broker localhost:9092 \
@@ -196,6 +189,30 @@ optional arguments:
10 2.494769 192.168.0.3 → 224.0.0.251 MDNS 82 Standard query 0x0000 PTR _googlecast._tcp.local, "QM" question
```
+#### Example 5
+
+Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI.
+
+```
+$ pycapa --consumer \
+ --kafka-broker localhost:9092 \
+ --kafka-topic pcap \
+ --max-packets 10 \
+ | tshark -i -
+Capturing on 'Standard input'
+ 1 0.000000 ArrisGro_0e:65:df → Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1
+ 2 0.000044 Apple_bf:0d:43 → ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43
+ 3 0.203495 fe80::1286:8cff:fe0e:65df → ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+ 4 2.031988 192.168.0.3 → 96.27.183.249 TCP 54 55110 → 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0
+ 5 2.035816 192.30.253.125 → 192.168.0.3 TLSv1.2 97 Application Data
+ 6 2.035892 192.168.0.3 → 192.30.253.125 TCP 66 54671 → 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052
+ 7 2.035994 192.168.0.3 → 192.30.253.125 TLSv1.2 101 Application Data
+ 8 2.053866 96.27.183.249 → 192.168.0.3 TCP 66 [TCP ACKed unseen segment] 443 → 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381
+ 9 2.083872 192.30.253.125 → 192.168.0.3 TCP 66 443 → 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495
+ 10 3.173189 fe80::1286:8cff:fe0e:65df → ff02::1 ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+10 packets captured
+```
+
### Kerberos
The probe can be used in a Kerberized environment. Follow these additional steps to use Pycapa with Kerberos. The following assumptions have been made. These may need altered to fit your environment.
http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/consumer.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/consumer.py b/metron-sensors/pycapa/pycapa/consumer.py
index 7029f25..484ae3c 100644
--- a/metron-sensors/pycapa/pycapa/consumer.py
+++ b/metron-sensors/pycapa/pycapa/consumer.py
@@ -23,7 +23,7 @@ import random
import logging
import time
import struct
-from confluent_kafka import Consumer, KafkaException, KafkaError
+from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED
from common import to_date, to_hex, unpack_ts
@@ -56,16 +56,51 @@ def packet_header(msg):
return hdr
+def seek_to_end(consumer, partitions):
+ """ Advance all partitions to the last offset. """
+
+ # advance to the end, ignoring any committed offsets
+ for p in partitions:
+ p.offset = OFFSET_END
+ consumer.assign(partitions)
+
+
+def seek_to_begin(consumer, partitions):
+ """ Advance all partitions to the first offset. """
+
+ # advance to the end, ignoring any committed offsets
+ for p in partitions:
+ p.offset = OFFSET_BEGINNING
+ consumer.assign(partitions)
+
+
+def seek_to_stored(consumer, partitions):
+ """ Advance all partitions to the stored offset. """
+
+ # advance to the end, ignoring any committed offsets
+ for p in partitions:
+ p.offset = OFFSET_STORED
+ consumer.assign(partitions)
+
+
def consumer(args, poll_timeout=3.0):
""" Consumes packets from a Kafka topic. """
# setup the signal handler
signal.signal(signal.SIGINT, signal_handler)
+ # where to start consuming messages from
+ kafka_offset_options = {
+ "begin": seek_to_begin,
+ "end": seek_to_end,
+ "stored": seek_to_stored
+ }
+ on_assign_cb = kafka_offset_options[args.kafka_offset]
+
# connect to kafka
logging.debug("Connecting to Kafka; %s", args.kafka_configs)
kafka_consumer = Consumer(args.kafka_configs)
- kafka_consumer.subscribe([args.kafka_topic])
+ kafka_consumer.subscribe([args.kafka_topic], on_assign=on_assign_cb)
# if 'pretty-print' not set, write libpcap global header
if args.pretty_print == 0:
@@ -85,8 +120,10 @@ def consumer(args, poll_timeout=3.0):
elif msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
- logging.debug("reached end of topar: topic=%s, partition=%d, offset=%s", msg.topic(), msg.partition(), msg.offset())
- elif msg.error():
+ if args.pretty_print > 0:
+ print "Reached end of topar: topic=%s, partition=%d, offset=%s" % (
+ msg.topic(), msg.partition(), msg.offset())
+ else:
raise KafkaException(msg.error())
else:
@@ -103,9 +140,9 @@ def consumer(args, poll_timeout=3.0):
elif pkts_in % args.pretty_print == 0:
# pretty print
- print 'Packet: count=%s date=%s topic=%s' % (
- pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic)
- print to_hex(msg.value())
+ print 'Packet[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % (
+ pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic,
+ msg.partition(), msg.offset(), len(msg.value()))
finally:
sys.stdout.close()
http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/producer.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/producer.py b/metron-sensors/pycapa/pycapa/producer.py
index 7374522..ec21fdc 100644
--- a/metron-sensors/pycapa/pycapa/producer.py
+++ b/metron-sensors/pycapa/pycapa/producer.py
@@ -21,11 +21,11 @@ import pcapy
import argparse
import random
import logging
-from common import to_date, to_hex, pack_ts
+from common import to_date, to_hex, pack_ts, unpack_ts
from confluent_kafka import Producer
finished = threading.Event()
-
+producer_args = None
def signal_handler(signum, frame):
""" Initiates a clean shutdown for a SIGINT """
@@ -57,10 +57,18 @@ def delivery_callback(err, msg):
if err:
logging.error("message delivery failed: error=%s", err)
- else:
- logging.debug("message delivery succeeded: pkts_out=%d", delivery_callback.pkts_out)
+
+ elif msg is not None:
delivery_callback.pkts_out += 1
+ pretty_print = 0
+ pretty_print = producer_args.pretty_print
+
+ if pretty_print > 0 and delivery_callback.pkts_out % pretty_print == 0:
+ print 'Packet delivered[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % (
+ delivery_callback.pkts_out, to_date(unpack_ts(msg.key())), msg.topic(),
+ msg.partition(), msg.offset(), len(msg.value()))
+
def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
""" Captures packets from a network interface and sends them to a Kafka topic. """
@@ -68,6 +76,9 @@ def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
# setup the signal handler
signal.signal(signal.SIGINT, signal_handler)
+ global producer_args
+ producer_args = args
+
# connect to kafka
logging.info("Connecting to Kafka; %s", args.kafka_configs)
kafka_producer = Producer(args.kafka_configs)
@@ -88,10 +99,9 @@ def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
pkt_ts = timestamp(pkt_hdr)
kafka_producer.produce(args.kafka_topic, key=pack_ts(pkt_ts), value=pkt_raw, callback=delivery_callback)
- # debug messages, if needed
+ # pretty print, if needed
if args.pretty_print > 0 and pkts_in % args.pretty_print == 0:
- print '{} packet(s) received'.format(pkts_in)
- print to_hex(pkt_raw)
+ print 'Packet received[%s]' % (pkts_in)
# serve the callback queue
kafka_producer.poll(0)
http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/pycapa_cli.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/pycapa_cli.py b/metron-sensors/pycapa/pycapa/pycapa_cli.py
index f650280..609205a 100644
--- a/metron-sensors/pycapa/pycapa/pycapa_cli.py
+++ b/metron-sensors/pycapa/pycapa/pycapa_cli.py
@@ -39,13 +39,19 @@ def make_parser():
default=False)
parser.add_argument('-k', '--kafka-broker',
- help='kafka broker(s)',
+ help='kafka broker(s) as host:port',
dest='kafka_brokers')
parser.add_argument('-t', '--kafka-topic',
help='kafka topic',
dest='kafka_topic')
+ parser.add_argument('-o', '--kafka-offset',
+ help='kafka offset to consume from; default=end',
+ dest='kafka_offset',
+ choices=['begin','end','stored'],
+ default='end')
+
parser.add_argument('-i', '--interface',
help='network interface to listen on',
dest='interface',
@@ -64,7 +70,7 @@ def make_parser():
default=0)
parser.add_argument('-ll', '--log-level',
- help='set the log level',
+ help='set the log level; DEBUG, INFO, WARN',
dest='log_level',
default='INFO')
@@ -75,7 +81,7 @@ def make_parser():
action='append')
parser.add_argument('-s','--snaplen',
- help="snapshot length",
+ help="capture only the first X bytes of each packet; default=65535",
dest='snaplen',
type=int,
default=65535)
@@ -96,13 +102,21 @@ def keyval(input, delim="="):
def valid_args(args):
""" Validates the command-line arguments. """
- if args.producer and args.kafka_brokers and args.kafka_topic and args.interface:
- return True
- elif args.consumer and args.kafka_brokers and args.kafka_topic:
- return True
- else:
+ if not args.producer and not args.consumer:
+ print "error: expected either --consumer or --producer \n"
return False
+ elif args.producer and not (args.kafka_brokers and args.kafka_topic and args.interface):
+ print "error: missing required args: expected [--kafka-broker, --kafka-topic, --interface] \n"
+ return False
+
+ elif args.consumer and not (args.kafka_brokers and args.kafka_topic):
+ print "error: missing required args: expected [--kafka-broker, --kafka-topic] \n"
+ return False
+
+ else:
+ return True
+
def clean_kafka_configs(args):
""" Cleans and transforms the Kafka client configs. """
@@ -116,7 +130,7 @@ def clean_kafka_configs(args):
# boostrap servers can be set as a "-X bootstrap.servers=KAFKA:9092" or "-k KAFKA:9092"
bootstrap_key = "bootstrap.servers"
if(bootstrap_key not in configs):
- configs[bootstrap_key] = args.kafka_brokers;
+ configs[bootstrap_key] = args.kafka_brokers
# if no 'group.id', generate a random one
group_key = "group.id"
@@ -125,7 +139,9 @@ def clean_kafka_configs(args):
args.kafka_configs = configs
+
def main():
+
parser = make_parser()
args = parser.parse_args()