You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/06/01 15:16:04 UTC

metron git commit: METRON-937 Pycapa - Consume Messages from Begin, End, or Stored Offsets (nickwallen) closes apache/metron#570

Repository: metron
Updated Branches:
  refs/heads/master 74bc236d5 -> 8779eb3fe


METRON-937 Pycapa - Consume Messages from Begin, End, or Stored Offsets (nickwallen) closes apache/metron#570


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/8779eb3f
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/8779eb3f
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/8779eb3f

Branch: refs/heads/master
Commit: 8779eb3fe6d2767d6b79d665adec735380cf2d61
Parents: 74bc236
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Jun 1 11:15:42 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Jun 1 11:15:42 2017 -0400

----------------------------------------------------------------------
 metron-sensors/pycapa/README.md            | 169 +++++++++++++-----------
 metron-sensors/pycapa/pycapa/consumer.py   |  51 ++++++-
 metron-sensors/pycapa/pycapa/producer.py   |  24 +++-
 metron-sensors/pycapa/pycapa/pycapa_cli.py |  34 +++--
 4 files changed, 179 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/README.md b/metron-sensors/pycapa/README.md
index 6f41d63..fed1399 100644
--- a/metron-sensors/pycapa/README.md
+++ b/metron-sensors/pycapa/README.md
@@ -72,19 +72,22 @@ Pycapa has two primary runtime modes.
 ### Parameters
 
 ```
-$ pycapa -h
+$ pycapa --help
 usage: pycapa [-h] [-p] [-c] [-k KAFKA_BROKERS] [-t KAFKA_TOPIC]
-              [-i NETWORK_IFACE] [-m MAX_PACKETS] [-pp PRETTY_PRINT]
-              [-ll LOG_LEVEL] [-X KAFKA_CONFIGS] [-s SNAPLEN]
+              [-o {begin,end,stored}] [-i NETWORK_IFACE] [-m MAX_PACKETS]
+              [-pp PRETTY_PRINT] [-ll LOG_LEVEL] [-X KAFKA_CONFIGS]
+              [-s SNAPLEN]
 
 optional arguments:
   -h, --help            show this help message and exit
   -p, --producer        sniff packets and send to kafka
   -c, --consumer        read packets from kafka
   -k KAFKA_BROKERS, --kafka-broker KAFKA_BROKERS
-                        kafka broker(s)
+                        kafka broker(s) as host:port
   -t KAFKA_TOPIC, --kafka-topic KAFKA_TOPIC
                         kafka topic
+  -o {begin,end,stored}, --kafka-offset {begin,end,stored}
+                        kafka offset to consume from; default=end
   -i NETWORK_IFACE, --interface NETWORK_IFACE
                         network interface to listen on
   -m MAX_PACKETS, --max-packets MAX_PACKETS
@@ -92,91 +95,81 @@ optional arguments:
   -pp PRETTY_PRINT, --pretty-print PRETTY_PRINT
                         pretty print every X packets
   -ll LOG_LEVEL, --log-level LOG_LEVEL
-                        set the log level
+                        set the log level; DEBUG, INFO, WARN
   -X KAFKA_CONFIGS      define a kafka client parameter; key=value
   -s SNAPLEN, --snaplen SNAPLEN
-                        snapshot length
+                        capture only the first X bytes of each packet;
+                        default=65535
 ```
 
 ### Examples
 
-**Example**: Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`.
-  ```
-  $ pycapa --producer \
-      --interface eth0 \
-      --kafka-broker localhost:9092 \
-      --kafka-topic pcap \
-      --max-packets 10
-  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'}
-  INFO:root:Starting packet capture
-  INFO:root:Waiting for '10' message(s) to flush
-  INFO:root:'10' packet(s) in, '10' packet(s) out
-  ```
+#### Example 1
 
-**Example**: Capture packets until SIGINT is received.  A SIGINT is the interrupt signal sent when entering CTRL-D in the console.
-  ```
-  $ pycapa --producer \
-      --interface eth0 \
-      --kafka-broker localhost:9092 \
-      --kafka-topic pcap
-  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'}
-  INFO:root:Starting packet capture
-  ^C
-  INFO:root:Clean shutdown process started
-  INFO:root:Waiting for '0' message(s) to flush
-  INFO:root:'7' packet(s) in, '7' packet(s) out
-  ```
+Capture 10 packets from the `eth0` network interface and forward those to a Kafka topic called `pcap` running on `localhost:9092`.  The process will not terminate until all messages have been delivered to Kafka.
 
-**Example**: While capturing packets, output diagnostic information every 10 packets.
-  ```
-  $ pycapa --producer \
-      --interface en0 \
-      --kafka-broker localhost:9092 \
-      --kafka-topic pcap \
-      --pretty-print 10
-  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'YMDSEEDIHVWD'}
+```
+$ pycapa --producer \
+    --interface eth0 \
+    --kafka-broker localhost:9092 \
+    --kafka-topic pcap \
+    --max-packets 10
+INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'AWBHMIAESAHJ'}
+INFO:root:Starting packet capture
+INFO:root:Waiting for '6' message(s) to flush
+INFO:root:'10' packet(s) in, '10' packet(s) out
+```
+
+#### Example 2
+
+Capture packets until SIGINT is received (the interrupt signal sent when entering CTRL-C in the console.)  In this example, nothing will be reported as packets are captured and delivered to Kafka.  Simply wait a few seconds, then type CTRL-C and the number of packets will be reported.
+
+```
+$ pycapa --producer \
+    --interface en0 \
+    --kafka-broker localhost:9092 \
+    --kafka-topic pcap
+INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'EULLGDOMZDCT'}
+INFO:root:Starting packet capture
+^C
+INFO:root:Clean shutdown process started
+INFO:root:Waiting for '2' message(s) to flush
+INFO:root:'21' packet(s) in, '21' packet(s) out
+```
+
+#### Example 3
+
+While capturing packets, output diagnostic information every 5 packets.  Diagnostics will report when packets have been received from the network interface and when they have been successfully delivered to Kafka.
+
+```
+$ pycapa --producer \
+    --interface eth0 \
+    --kafka-broker localhost:9092 \
+    --kafka-topic pcap \
+    --pretty-print 5
+  INFO:root:Connecting to Kafka; {'bootstrap.servers': 'localhost:9092', 'group.id': 'UAWINMBDNQEH'}
   INFO:root:Starting packet capture
-  10 packet(s) received
-  ac bc 32 bf 0d 43 b8 3e 59 8b 8a 8a 08 00 45 00
-  00 3c 00 00 40 00 40 06 b9 66 c0 a8 00 02 c0 a8
-  00 03 1f 7c d7 14 5f 8b 82 b4 a8 c5 f6 63 a0 12
-  38 90 59 cc 00 00 02 04 05 b4 04 02 08 0a 00 51
-  44 17 39 43 3e 9b 01 03 03 04
-  20 packet(s) received
-  01 00 5e 00 00 fb ac bc 32 bf 0d 43 08 00 45 00
-  00 44 d2 09 00 00 ff 11 47 f8 c0 a8 00 03 e0 00
-  00 fb 14 e9 14 e9 00 30 69 fc 00 00 00 00 00 01
-  00 00 00 00 00 00 0b 5f 67 6f 6f 67 6c 65 63 61
-  73 74 04 5f 74 63 70 05 6c 6f 63 61 6c 00 00 0c
-  80 01
+  Packet received[5]
+  Packet delivered[5]: date=2017-05-08 14:48:54.474031 topic=pcap partition=0 offset=29086 len=42
+  Packet received[10]
+  Packet received[15]
+  Packet delivered[10]: date=2017-05-08 14:48:58.879710 topic=pcap partition=0 offset=0 len=187
+  Packet delivered[15]: date=2017-05-08 14:48:59.633127 topic=pcap partition=0 offset=0 len=43
+  Packet received[20]
+  Packet delivered[20]: date=2017-05-08 14:49:01.949628 topic=pcap partition=0 offset=29101 len=134
+  Packet received[25]
   ^C
   INFO:root:Clean shutdown process started
-  INFO:root:Waiting for '2' message(s) to flush
-  INFO:root:'20' packet(s) in, '20' packet(s) out
-  ```
+  Packet delivered[25]: date=2017-05-08 14:49:03.589940 topic=pcap partition=0 offset=0 len=142
+  INFO:root:Waiting for '1' message(s) to flush
+  INFO:root:'27' packet(s) in, '27' packet(s) out
 
-**Example**: Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI.
-  ```
-  $ pycapa --consumer \
-      --kafka-broker localhost:9092 \
-      --kafka-topic pcap \
-      --max-packets 10 \
-      | tshark -i -
-  Capturing on 'Standard input'
-      1   0.000000 ArrisGro_0e:65:df → Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1
-      2   0.000044 Apple_bf:0d:43 → ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43
-      3   0.203495 fe80::1286:8cff:fe0e:65df → ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
-      4   2.031988  192.168.0.3 → 96.27.183.249 TCP 54 55110 → 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0
-      5   2.035816 192.30.253.125 → 192.168.0.3  TLSv1.2 97 Application Data
-      6   2.035892  192.168.0.3 → 192.30.253.125 TCP 66 54671 → 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052
-      7   2.035994  192.168.0.3 → 192.30.253.125 TLSv1.2 101 Application Data
-      8   2.053866 96.27.183.249 → 192.168.0.3  TCP 66 [TCP ACKed unseen segment] 443 → 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381
-      9   2.083872 192.30.253.125 → 192.168.0.3  TCP 66 443 → 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495
-     10   3.173189 fe80::1286:8cff:fe0e:65df → ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
-  10 packets captured
-  ```
+```
+
+#### Example 4
+
+Consume 10 packets and create a libpcap-compliant pcap file.
 
-**Example**: Consume 10 packets and create a libpcap-compliant pcap file.
   ```
   $ pycapa --consumer \
       --kafka-broker localhost:9092 \
@@ -196,6 +189,30 @@ optional arguments:
      10   2.494769  192.168.0.3 → 224.0.0.251  MDNS 82 Standard query 0x0000 PTR _googlecast._tcp.local, "QM" question
   ```
 
+#### Example 5
+
+Consume 10 packets from the Kafka topic `pcap` running on `localhost:9092`, then pipe those into Wireshark for DPI.
+
+```
+$ pycapa --consumer \
+    --kafka-broker localhost:9092 \
+    --kafka-topic pcap \
+    --max-packets 10 \
+    | tshark -i -
+Capturing on 'Standard input'
+    1   0.000000 ArrisGro_0e:65:df → Apple_bf:0d:43 ARP 56 Who has 192.168.0.3? Tell 192.168.0.1
+    2   0.000044 Apple_bf:0d:43 → ArrisGro_0e:65:df ARP 42 192.168.0.3 is at ac:bc:32:bf:0d:43
+    3   0.203495 fe80::1286:8cff:fe0e:65df → ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+    4   2.031988  192.168.0.3 → 96.27.183.249 TCP 54 55110 → 443 [ACK] Seq=1 Ack=1 Win=4108 Len=0
+    5   2.035816 192.30.253.125 → 192.168.0.3  TLSv1.2 97 Application Data
+    6   2.035892  192.168.0.3 → 192.30.253.125 TCP 66 54671 → 443 [ACK] Seq=1 Ack=32 Win=4095 Len=0 TSval=961120495 TSecr=2658503052
+    7   2.035994  192.168.0.3 → 192.30.253.125 TLSv1.2 101 Application Data
+    8   2.053866 96.27.183.249 → 192.168.0.3  TCP 66 [TCP ACKed unseen segment] 443 → 55110 [ACK] Seq=1 Ack=2 Win=243 Len=0 TSval=728145145 TSecr=961030381
+    9   2.083872 192.30.253.125 → 192.168.0.3  TCP 66 443 → 54671 [ACK] Seq=32 Ack=36 Win=31 Len=0 TSval=2658503087 TSecr=961120495
+   10   3.173189 fe80::1286:8cff:fe0e:65df → ff02::1      ICMPv6 134 Router Advertisement from 10:86:8c:0e:65:df
+10 packets captured
+```
+
 ### Kerberos
 
 The probe can be used in a Kerberized environment.  Follow these additional steps to use Pycapa with Kerberos.  The following assumptions have been made.  These may need altered to fit your environment.

http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/consumer.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/consumer.py b/metron-sensors/pycapa/pycapa/consumer.py
index 7029f25..484ae3c 100644
--- a/metron-sensors/pycapa/pycapa/consumer.py
+++ b/metron-sensors/pycapa/pycapa/consumer.py
@@ -23,7 +23,7 @@ import random
 import logging
 import time
 import struct
-from confluent_kafka import Consumer, KafkaException, KafkaError
+from confluent_kafka import Consumer, KafkaException, KafkaError, OFFSET_BEGINNING, OFFSET_END, OFFSET_STORED
 from common import to_date, to_hex, unpack_ts
 
 
@@ -56,16 +56,51 @@ def packet_header(msg):
     return hdr
 
 
+def seek_to_end(consumer, partitions):
+    """ Advance all partitions to the last offset. """
+
+    # advance to the end, ignoring any committed offsets
+    for p in partitions:
+        p.offset = OFFSET_END
+    consumer.assign(partitions)
+
+
+def seek_to_begin(consumer, partitions):
+    """ Advance all partitions to the first offset. """
+
+    # advance to the end, ignoring any committed offsets
+    for p in partitions:
+        p.offset = OFFSET_BEGINNING
+    consumer.assign(partitions)
+
+
+def seek_to_stored(consumer, partitions):
+    """ Advance all partitions to the stored offset. """
+
+    # advance to the end, ignoring any committed offsets
+    for p in partitions:
+        p.offset = OFFSET_STORED
+    consumer.assign(partitions)
+
+
 def consumer(args, poll_timeout=3.0):
     """ Consumes packets from a Kafka topic. """
 
     # setup the signal handler
     signal.signal(signal.SIGINT, signal_handler)
 
+    # where to start consuming messages from
+    kafka_offset_options = {
+        "begin": seek_to_begin,
+        "end": seek_to_end,
+        "stored": seek_to_stored
+    }
+    on_assign_cb = kafka_offset_options[args.kafka_offset]
+
     # connect to kafka
     logging.debug("Connecting to Kafka; %s", args.kafka_configs)
     kafka_consumer = Consumer(args.kafka_configs)
-    kafka_consumer.subscribe([args.kafka_topic])
+    kafka_consumer.subscribe([args.kafka_topic], on_assign=on_assign_cb)
 
     # if 'pretty-print' not set, write libpcap global header
     if args.pretty_print == 0:
@@ -85,8 +120,10 @@ def consumer(args, poll_timeout=3.0):
             elif msg.error():
 
                 if msg.error().code() == KafkaError._PARTITION_EOF:
-                    logging.debug("reached end of topar: topic=%s, partition=%d, offset=%s", msg.topic(), msg.partition(), msg.offset())
-                elif msg.error():
+                    if args.pretty_print > 0:
+                        print "Reached end of topar: topic=%s, partition=%d, offset=%s" % (
+                            msg.topic(), msg.partition(), msg.offset())
+                else:
                     raise KafkaException(msg.error())
 
             else:
@@ -103,9 +140,9 @@ def consumer(args, poll_timeout=3.0):
                 elif pkts_in % args.pretty_print == 0:
 
                     # pretty print
-                    print 'Packet: count=%s date=%s topic=%s' % (
-                        pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic)
-                    print to_hex(msg.value())
+                    print 'Packet[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % (
+                        pkts_in, to_date(unpack_ts(msg.key())), args.kafka_topic,
+                        msg.partition(), msg.offset(), len(msg.value()))
 
     finally:
         sys.stdout.close()

http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/producer.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/producer.py b/metron-sensors/pycapa/pycapa/producer.py
index 7374522..ec21fdc 100644
--- a/metron-sensors/pycapa/pycapa/producer.py
+++ b/metron-sensors/pycapa/pycapa/producer.py
@@ -21,11 +21,11 @@ import pcapy
 import argparse
 import random
 import logging
-from common import to_date, to_hex, pack_ts
+from common import to_date, to_hex, pack_ts, unpack_ts
 from confluent_kafka import Producer
 
 finished = threading.Event()
-
+producer_args = None
 
 def signal_handler(signum, frame):
     """ Initiates a clean shutdown for a SIGINT """
@@ -57,10 +57,18 @@ def delivery_callback(err, msg):
 
     if err:
         logging.error("message delivery failed: error=%s", err)
-    else:
-        logging.debug("message delivery succeeded: pkts_out=%d", delivery_callback.pkts_out)
+
+    elif msg is not None:
         delivery_callback.pkts_out += 1
 
+        pretty_print = 0
+        pretty_print = producer_args.pretty_print
+
+        if pretty_print > 0 and delivery_callback.pkts_out % pretty_print == 0:
+            print 'Packet delivered[%s]: date=%s topic=%s partition=%s offset=%s len=%s' % (
+                delivery_callback.pkts_out, to_date(unpack_ts(msg.key())), msg.topic(),
+                msg.partition(), msg.offset(), len(msg.value()))
+
 
 def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
     """ Captures packets from a network interface and sends them to a Kafka topic. """
@@ -68,6 +76,9 @@ def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
     # setup the signal handler
     signal.signal(signal.SIGINT, signal_handler)
 
+    global producer_args
+    producer_args = args
+
     # connect to kafka
     logging.info("Connecting to Kafka; %s", args.kafka_configs)
     kafka_producer = Producer(args.kafka_configs)
@@ -88,10 +99,9 @@ def producer(args, sniff_timeout_ms=500, sniff_promisc=True):
                 pkt_ts = timestamp(pkt_hdr)
                 kafka_producer.produce(args.kafka_topic, key=pack_ts(pkt_ts), value=pkt_raw, callback=delivery_callback)
 
-                # debug messages, if needed
+                # pretty print, if needed
                 if args.pretty_print > 0 and pkts_in % args.pretty_print == 0:
-                    print '{} packet(s) received'.format(pkts_in)
-                    print to_hex(pkt_raw)
+                    print 'Packet received[%s]' % (pkts_in)
 
             # serve the callback queue
             kafka_producer.poll(0)

http://git-wip-us.apache.org/repos/asf/metron/blob/8779eb3f/metron-sensors/pycapa/pycapa/pycapa_cli.py
----------------------------------------------------------------------
diff --git a/metron-sensors/pycapa/pycapa/pycapa_cli.py b/metron-sensors/pycapa/pycapa/pycapa_cli.py
index f650280..609205a 100644
--- a/metron-sensors/pycapa/pycapa/pycapa_cli.py
+++ b/metron-sensors/pycapa/pycapa/pycapa_cli.py
@@ -39,13 +39,19 @@ def make_parser():
                         default=False)
 
     parser.add_argument('-k', '--kafka-broker',
-                        help='kafka broker(s)',
+                        help='kafka broker(s) as host:port',
                         dest='kafka_brokers')
 
     parser.add_argument('-t', '--kafka-topic',
                         help='kafka topic',
                         dest='kafka_topic')
 
+    parser.add_argument('-o', '--kafka-offset',
+                        help='kafka offset to consume from; default=end',
+                        dest='kafka_offset',
+                        choices=['begin','end','stored'],
+                        default='end')
+
     parser.add_argument('-i', '--interface',
                         help='network interface to listen on',
                         dest='interface',
@@ -64,7 +70,7 @@ def make_parser():
                         default=0)
 
     parser.add_argument('-ll', '--log-level',
-                        help='set the log level',
+                        help='set the log level; DEBUG, INFO, WARN',
                         dest='log_level',
                         default='INFO')
 
@@ -75,7 +81,7 @@ def make_parser():
                         action='append')
 
     parser.add_argument('-s','--snaplen',
-                        help="snapshot length",
+                        help="capture only the first X bytes of each packet; default=65535",
                         dest='snaplen',
                         type=int,
                         default=65535)
@@ -96,13 +102,21 @@ def keyval(input, delim="="):
 def valid_args(args):
     """ Validates the command-line arguments. """
 
-    if args.producer and args.kafka_brokers and args.kafka_topic and args.interface:
-        return True
-    elif args.consumer and args.kafka_brokers and args.kafka_topic:
-        return True
-    else:
+    if not args.producer and not args.consumer:
+        print "error: expected either --consumer or --producer \n"
         return False
 
+    elif args.producer and not (args.kafka_brokers and args.kafka_topic and args.interface):
+        print "error: missing required args: expected [--kafka-broker, --kafka-topic, --interface] \n"
+        return False
+
+    elif args.consumer and not (args.kafka_brokers and args.kafka_topic):
+        print "error: missing required args: expected [--kafka-broker, --kafka-topic] \n"
+        return False
+
+    else:
+        return True
+
 
 def clean_kafka_configs(args):
     """ Cleans and transforms the Kafka client configs. """
@@ -116,7 +130,7 @@ def clean_kafka_configs(args):
     # boostrap servers can be set as a "-X bootstrap.servers=KAFKA:9092" or "-k KAFKA:9092"
     bootstrap_key = "bootstrap.servers"
     if(bootstrap_key not in configs):
-        configs[bootstrap_key] = args.kafka_brokers;
+        configs[bootstrap_key] = args.kafka_brokers
 
     # if no 'group.id', generate a random one
     group_key = "group.id"
@@ -125,7 +139,9 @@ def clean_kafka_configs(args):
 
     args.kafka_configs = configs
 
+
 def main():
+
     parser = make_parser()
     args = parser.parse_args()