You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/04/06 12:35:18 UTC

[1/2] incubator-metron git commit: METRON-822 Improve Fastcapa Performance (nickwallen) closes apache/incubator-metron#509

Repository: incubator-metron
Updated Branches:
  refs/heads/master 58775bc48 -> 81677fd90


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/types.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/types.h b/metron-sensors/fastcapa/src/types.h
new file mode 100644
index 0000000..0210eda
--- /dev/null
+++ b/metron-sensors/fastcapa/src/types.h
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef METRON_TYPES_H
+#define METRON_TYPES_H
+
+/**
+ * Tracks packet processing stats.
+ */
+struct app_stats {
+    uint64_t in;
+    uint64_t out;
+    uint64_t depth;
+    uint64_t drops;
+} __rte_cache_aligned;
+
+/**
+ * The parameters required by a receive worker.
+ */
+struct rx_worker_params {
+
+    /* worker identifier */
+    uint16_t worker_id;
+
+    /* queue identifier from which packets are fetched */
+    uint16_t queue_id;
+
+    /* how many packets are pulled off the queue at a time */
+    uint16_t burst_size;
+
+    /* the ring onto which the packets are enqueued */
+    struct rte_ring *output_ring;
+
+    /* metrics */
+    struct app_stats stats;
+
+} __rte_cache_aligned;
+
+/**
+ * The parameters required by a transmit worker.
+ */
+struct tx_worker_params {
+
+    /* worker identifier */
+    uint16_t worker_id;
+
+    /* how many packets are pulled off the ring at a time */
+    uint16_t burst_size;
+
+    /* the ring from which packets are dequeued */
+    struct rte_ring *input_ring;
+
+    /* identifies the kafka client connection used by the worker */
+    int kafka_id;
+ 
+    /* worker metrics */
+    struct app_stats stats;
+
+} __rte_cache_aligned;
+
+
+#endif
+


[2/2] incubator-metron git commit: METRON-822 Improve Fastcapa Performance (nickwallen) closes apache/incubator-metron#509

Posted by ni...@apache.org.
METRON-822 Improve Fastcapa Performance (nickwallen) closes apache/incubator-metron#509


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/81677fd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/81677fd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/81677fd9

Branch: refs/heads/master
Commit: 81677fd906a01a768777682ebf7caed44708c86e
Parents: 58775bc
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Apr 6 08:34:49 2017 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Apr 6 08:34:49 2017 -0400

----------------------------------------------------------------------
 .../roles/fastcapa/defaults/main.yml            |  14 +-
 .../roles/fastcapa/tasks/fastcapa.yml           |   2 +-
 .../roles/fastcapa/templates/fastcapa           |  19 +-
 .../roles/fastcapa/templates/fastcapa.conf      |   4 +-
 .../fastcapa-test-platform/vars/main.yml        |  16 +-
 metron-sensors/fastcapa/.gitignore              |   2 +
 metron-sensors/fastcapa/Makefile                |   3 +-
 metron-sensors/fastcapa/README.md               | 435 ++++++++++++++++++-
 metron-sensors/fastcapa/conf/fastcapa.conf      | 124 ++++++
 metron-sensors/fastcapa/conf/localhost.kafka    |  67 ---
 metron-sensors/fastcapa/src/Makefile            |   4 +-
 metron-sensors/fastcapa/src/args.c              | 218 +++++++---
 metron-sensors/fastcapa/src/args.h              |  48 +-
 metron-sensors/fastcapa/src/kafka.c             | 223 ++++++++--
 metron-sensors/fastcapa/src/kafka.h             |  24 +-
 metron-sensors/fastcapa/src/main.c              | 396 ++++++++++-------
 metron-sensors/fastcapa/src/main.h              |  38 +-
 metron-sensors/fastcapa/src/types.h             |  78 ++++
 18 files changed, 1353 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-deployment/roles/fastcapa/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/fastcapa/defaults/main.yml b/metron-deployment/roles/fastcapa/defaults/main.yml
index cc231c1..19168a8 100644
--- a/metron-deployment/roles/fastcapa/defaults/main.yml
+++ b/metron-deployment/roles/fastcapa/defaults/main.yml
@@ -17,8 +17,8 @@
 ---
 # dpdk
 dpdk_home: "/usr/local/dpdk"
-dpdk_version: "16.11"
-dpdk_sdk: "/root/dpdk-{{ dpdk_version }}"
+dpdk_version: "16.11.1"
+dpdk_sdk: "/root/dpdk-stable-{{ dpdk_version }}"
 dpdk_src_url: "http://fast.dpdk.org/rel/dpdk-{{ dpdk_version }}.tar.xz"
 dpdk_target: "x86_64-native-linuxapp-gcc"
 num_huge_pages: 512
@@ -26,8 +26,16 @@ extra_cflags: -g
 
 # fastcapa
 fastcapa_work_dir: /root/fastcapa
+fastcapa_build_dir: "{{ fastcapa_work_dir }}/build/app/"
 fastcapa_prefix: /usr/local/bin
 fastcapa_ld_library_path: /usr/local/lib
+fastcapa_bin: fastcapa
+
+# fastcapa settings
 fastcapa_portmask: 0x01
 fastcapa_kafka_config: /etc/fastcapa.conf
-fastcapa_bin: fastcapa
+fastcapa_topic: pcap
+fastcapa_burst_size: 32
+fastcapa_nb_rx_desc: 1024
+fastcapa_nb_rx_queue: 1
+fastcapa_tx_ring_size: 2048

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-deployment/roles/fastcapa/tasks/fastcapa.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/fastcapa/tasks/fastcapa.yml b/metron-deployment/roles/fastcapa/tasks/fastcapa.yml
index cad5b68..b555668 100644
--- a/metron-deployment/roles/fastcapa/tasks/fastcapa.yml
+++ b/metron-deployment/roles/fastcapa/tasks/fastcapa.yml
@@ -30,7 +30,7 @@
     LD_LIBRARY_PATH: "{{ fastcapa_ld_library_path }}"
 
 - name: Install fastcapa
-  shell: "cp {{ fastcapa_work_dir }}/src/build/app/{{ fastcapa_bin }} {{ fastcapa_prefix }}"
+  shell: "cp {{ fastcapa_build_dir }}/{{ fastcapa_bin }} {{ fastcapa_prefix }}"
   args:
     chdir: "{{ fastcapa_work_dir }}"
     creates: "{{ fastcapa_prefix }}/{{ fastcapa_bin }}"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-deployment/roles/fastcapa/templates/fastcapa
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/fastcapa/templates/fastcapa b/metron-deployment/roles/fastcapa/templates/fastcapa
index 637317f..5ee6abb 100644
--- a/metron-deployment/roles/fastcapa/templates/fastcapa
+++ b/metron-deployment/roles/fastcapa/templates/fastcapa
@@ -34,6 +34,14 @@ DAEMONLOG=/var/log/$NAME.log
 NOW=`date`
 DAEMON_PATH="{{ dpdk_sdk }}"
 
+PORT_MASK="{{ fastcapa_portmask }}"
+KAFKA_TOPIC="{{ fastcapa_topic }}"
+KAFKA_CONFIG="{{ fastcapa_kafka_config }}"
+BURST_SIZE="{{ fastcapa_burst_size }}"
+NB_RX_DESC="{{ fastcapa_nb_rx_desc }}"
+NB_RX_QUEUE="{{ fastcapa_nb_rx_queue }}"
+TX_RING_SIZE="{{ fastcapa_tx_ring_size }}"
+
 case "$1" in
   start)
     printf "%-50s" "Starting $NAME..."
@@ -42,9 +50,14 @@ case "$1" in
     cd $DAEMON_PATH
     DAEMON="{{ fastcapa_prefix }}/{{ fastcapa_bin }}"
     DAEMONOPTS+=" -- "
-    DAEMONOPTS+="-p {{ fastcapa_portmask }} "
-    DAEMONOPTS+="-t {{ fastcapa_topic }} "
-    DAEMONOPTS+="-c {{ fastcapa_kafka_config }} "
+    DAEMONOPTS+="-p $PORTMASK "
+    DAEMONOPTS+="-t $KAFKA_TOPIC "
+    DAEMONOPTS+="-c $KAFKA_CONFIG "
+    DAEMONOPTS+="-b $BURST_SIZE "
+    DAEMONOPTS+="-r $NB_RX_DESC "
+    DAEMONOPTS+="-q $NB_RX_QUEUE "
+    DAEMONOPTS+="-x $TX_RING_SIZE "
+
 
     PID=`$DAEMON $DAEMONOPTS >> $DAEMONLOG 2>&1 & echo $!`
     if [ -z $PID ]; then

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-deployment/roles/fastcapa/templates/fastcapa.conf
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/fastcapa/templates/fastcapa.conf b/metron-deployment/roles/fastcapa/templates/fastcapa.conf
index 6f6a89f..7d9eae4 100644
--- a/metron-deployment/roles/fastcapa/templates/fastcapa.conf
+++ b/metron-deployment/roles/fastcapa/templates/fastcapa.conf
@@ -27,7 +27,7 @@ metadata.broker.list = {{ kafka_broker_url }}
 client.id = metron-fastcapa
 
 # max number of messages allowed on the producer queue
-queue.buffering.max.messages = 1000
+queue.buffering.max.messages = 1000000
 
 # maximum time, in milliseconds, for buffering data on the producer queue
 queue.buffering.max.ms = 3000
@@ -45,7 +45,7 @@ message.send.max.retries = 5
 retry.backoff.ms = 250
 
 # how often statistics are emitted; 0 = never
-statistics.interval.ms = 0
+statistics.interval.ms = 5000
 
 # only provide delivery reports for failed messages
 delivery.report.only.error = false

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-deployment/vagrant/fastcapa-test-platform/vars/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/vagrant/fastcapa-test-platform/vars/main.yml b/metron-deployment/vagrant/fastcapa-test-platform/vars/main.yml
index 50e93d7..74bab29 100644
--- a/metron-deployment/vagrant/fastcapa-test-platform/vars/main.yml
+++ b/metron-deployment/vagrant/fastcapa-test-platform/vars/main.yml
@@ -15,16 +15,28 @@
 #  limitations under the License.
 #
 ---
+
+# 0.9.4+ required for fastcapa
+librdkafka_version: 0.9.4
+librdkafka_url: https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz 
+  
 dpdk_device: ["00:08.0"]
 dpdk_target: "x86_64-native-linuxapp-gcc"
 num_huge_pages: 512
-fastcapa_portmask: "0x01"
-fastcapa_topic: pcap
 kafka_broker_url: source:9092
 zookeeper_url: source:2181
 pcap_replay_interface: eth1
 kafka_broker_home: /usr/hdp/current/kafka-broker/
 
+# fastcapa settings
+fastcapa_portmask: 0x01
+fastcapa_kafka_config: /etc/fastcapa.conf
+fastcapa_topic: pcap
+fastcapa_burst_size: 32
+fastcapa_nb_rx_desc: 1024
+fastcapa_nb_rx_queue: 1
+fastcapa_tx_ring_size: 2048
+
 # dummy variables for pycapa's dependence on ambari_gather_facts
 cluster_name: dummy
 namenode_host: dummy

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/.gitignore
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/.gitignore b/metron-sensors/fastcapa/.gitignore
index 2efc4c0..29a2c88 100644
--- a/metron-sensors/fastcapa/.gitignore
+++ b/metron-sensors/fastcapa/.gitignore
@@ -1,3 +1,5 @@
 roles
 .vagrant
 *.retry
+build
+*.out

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/Makefile
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/Makefile b/metron-sensors/fastcapa/Makefile
index 90ecb17..6ce3c44 100644
--- a/metron-sensors/fastcapa/Makefile
+++ b/metron-sensors/fastcapa/Makefile
@@ -18,9 +18,10 @@
 all: src
 
 src:
-	cd src; make
+	cd src; make O=../build
 
 clean:
 	cd src; make clean
+	rm -rf build
 
 .PHONY: all src clean

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/README.md
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/README.md b/metron-sensors/fastcapa/README.md
index acfa991..74e7809 100644
--- a/metron-sensors/fastcapa/README.md
+++ b/metron-sensors/fastcapa/README.md
@@ -1,20 +1,441 @@
 Fastcapa
 ========
 
-Fastcapa is an Apache Metron sensor that performs fast network packet capture by leveraging Linux kernel-bypass and user space networking technology.  
-
-The sensor will bind to a network interface, capture network packets, and send the raw packet data to Kafka.  This provides a scalable mechanism for ingesting high-volumes of network packet data into a Hadoop-y cluster.
+Fastcapa is a probe that performs fast network packet capture by leveraging Linux kernel-bypass and user space networking technology.  The probe will bind to a network interface, capture network packets, and send the raw packet data to Kafka.  This provides a scalable mechanism for ingesting high-volumes of network packet data into a Hadoop-y cluster.
 
 Fastcapa leverages the Data Plane Development Kit ([DPDK](http://dpdk.org/)).  DPDK is a set of libraries and drivers to perform fast packet processing in Linux user space.  
 
-Getting Started
----------------
+* [Quick Start](#quick-start)
+* [Requirements](#requirements)
+* [Installation](#installation)
+* [Usage](#usage)
+* [How It Works](#how-it-works)
+* [Performance](#performance)
+* [FAQs](#faqs)
+
+Quick Start
+-----------
+
+The quickest way to see Fastcapa in action is to use a Virtualbox environment on your local machine.  The necessary files and instructions to do this are located at [`metron-deployment/vagrant/fastcapa-vagrant`](../../metron-deployment/vagrant/fastcapa-test-platform). All you need to do is execute the following.
+```
+cd metron-deployment/vagrant/fastcapa-test-platform
+vagrant up
+```
+
+This environment sets up two nodes.  One node produces network packets over a network interface.  The second node uses Fastcapa to capture those packets and write them to a Kafka broker running on the first node.  Basic validation is performed to ensure that Fastcapa is able to land packet data in Kafka.
+
+Requirements
+------------
+
+The following system requirements must be met to run the Fastcapa probe.
 
-The quickest way to get up and running is to use a Virtualbox environment on your local machine.  The necessary files and instructions to do this are located at [`metron-deployment/vagrant/fastcapa-vagrant`](../../metron-deployment/vagrant/fastcapa-test-platform).  
+* Linux kernel >= 2.6.34 
+* A [DPDK supported ethernet device; NIC](http://dpdk.org/doc/nics).
+* Port(s) on the ethernet device that can be dedicated for exclusive use by Fastcapa
 
 Installation
 ------------
 
 The process of installing Fastcapa has a fair number of steps and involves building DPDK, loading specific kernel modules, enabling huge page memory, and binding compatible network interface cards.
 
-The best documentation is code that actually does this for you.  An Ansible role that performs the entire installation procedure can be found at [`metron-deployment/roles/fastcapa`](../../metron-deployment/roles/fastcapa).  Use this to install Fastcapa or as a guide for manual installation.
+### Automated Installation
+
+The best documentation is code that actually does this for you.  An Ansible role that performs the entire installation procedure can be found at [`metron-deployment/roles/fastcapa`](../../metron-deployment/roles/fastcapa).  Use this to install Fastcapa or as a guide for manual installation.  The automated installation assumes CentOS 7.1 and is directly tested against [bento/centos-7.1](https://atlas.hashicorp.com/bento/boxes/centos-7.1).
+
+### Manual Installation
+
+The following manual installation steps assume that they are executed on CentOS 7.1.  Some minor differences may result if you use a different Linux distribution.
+
+* [Enable Transparent Huge Pages](#enable-transparent-huge-pages)
+* [Install DPDK](#install-dpdk)
+* [Install Librdkafka](#install-librdkafka)
+* [Install Fastcapa](#install-fastcapa)
+
+#### Enable Transparent Huge Pages
+
+The probe performs its own memory management by leveraging transparent huge pages.  In Linux, Transparent Huge Pages (THP) can be enabled either dynamically or on boot.  It is recommended that these be allocated on boot to increase the chance that a larger, physically contiguous chunk of memory can be allocated.
+
+The size of THPs that are supported will vary based on your CPU.  These typically include 2 MB and 1 GB THPs.  For better performance, allocate 1 GB THPs if supported by your CPU.
+
+1. Ensure that your CPU supports 1 GB THPs.  A CPU flag `pdpe1gb` indicates whether or not the CPU supports 1 GB THPs.
+    ```
+    grep --color=always pdpe1gb /proc/cpuinfo | uniq
+    ```
+
+2. Add the following boot parameters to the Linux kernel.  Edit `/etc/default/grub` and add the additional kernel parameters to the line starting with `GRUB_CMDLINE_LINUX`.
+    ```
+    GRUB_CMDLINE_LINUX=... default_hugepagesz=1G hugepagesz=1G hugepages=16
+    ```
+
+3. Rebuild the grub configuration then reboot.  The location of the Grub configuration file will differ across Linux distributions.
+    ```
+    cp /etc/grub2-efi.cfg /etc/grub2-efi.cfg.orig
+    /sbin/grub2-mkconfig -o /etc/grub2-efi.cfg
+    ```
+
+4. Once the host has been rebooted, ensure that the THPs were successfully allocated.
+    ```
+    $ grep HugePage /proc/meminfo
+    AnonHugePages:    933888 kB
+    HugePages_Total:      16
+    HugePages_Free:        0
+    HugePages_Rsvd:        0
+    HugePages_Surp:        0
+    ```
+
+    The total number of huge pages that you have been allocated should be distributed fairly evenly across each NUMA node.  In this example, a total of 16 were requested and 8 have been assigned on each of the 2 NUMA nodes.
+    ```
+    $ cat /sys/devices/system/node/node*/hugepages/hugepages-1048576kB/nr_hugepages
+    8
+    8
+    ```
+
+5. Once the THPs have been reserved, they need to be mounted to make them available to the probe.
+    ```
+    cp /etc/fstab /etc/fstab.orig
+    mkdir -p /mnt/huge_1GB
+    echo "nodev /mnt/huge_1GB hugetlbfs pagesize=1GB 0 0" >> /etc/fstab
+    mount -fav
+    ```
+
+#### Install DPDK
+
+1. Install the required dependencies.
+    ```
+    yum -y install "@Development tools"
+    yum -y install pciutils net-tools glib2 glib2-devel git
+    yum -y install kernel kernel-devel kernel-headers
+    ```
+
+2. Decide where DPDK will be installed.
+    ```
+    export DPDK_HOME=/usr/local/dpdk/
+    ```
+
+3. Download, build and install DPDK.
+    ```
+    wget http://fast.dpdk.org/rel/dpdk-16.11.1.tar.xz -O - | tar -xJ
+    cd dpdk-stable-16.11.1/
+    make config install T=x86_64-native-linuxapp-gcc DESTDIR=$DPDK_HOME
+    ```
+
+4. Find the PCI address of the ethernet device that you plan on using to capture network packets.  In the following example I plan on binding `enp9s0f0` which has a PCI address of `09:00.0`.
+    ```
+    $ lspci | grep "VIC Ethernet"
+    09:00.0 Ethernet controller: Cisco Systems Inc VIC Ethernet NIC (rev a2)
+    0a:00.0 Ethernet controller: Cisco Systems Inc VIC Ethernet NIC (rev a2)
+    ```
+    
+5. Bind the device.  Replace the device name and PCI address with what is appropriate for your environment.
+    ```
+    ifdown enp9s0f0
+    modprobe uio_pci_generic
+    $DPDK_HOME/sbin/dpdk-devbind --bind=uio_pci_generic "09:00.0"
+    ```
+
+6. Ensure that the device was bound. It should be shown as a 'network device using DPDK-compatible driver.'
+    ``` 
+    $ dpdk-devbind --status
+    Network devices using DPDK-compatible driver
+    ============================================
+    0000:09:00.0 'VIC Ethernet NIC' drv=uio_pci_generic unused=enic
+    Network devices using kernel driver
+    ===================================
+    0000:01:00.0 'I350 Gigabit Network Connection' if=eno1 drv=igb unused=uio_pci_generic
+    ```
+
+#### Install Librdkafka
+
+The probe has been tested with [Librdkafka 0.9.4](https://github.com/edenhill/librdkafka/releases/tag/v0.9.4).
+
+1. Choose an installation path.  In this example, the libs will actually be installed at `/usr/local/lib`; note that `lib` is appended to the prefix. 
+    ```
+    export RDK_PREFIX=/usr/local
+    ```
+
+2. Download, build and install.
+    ```
+    wget https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz  -O - | tar -xz
+    cd librdkafka-0.9.4/
+    ./configure --prefix=$RDK_PREFIX
+    make 
+    make install
+    ```
+
+3. Ensure that the installation location is on the search path for the runtime shared library loader.
+    ```
+    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$RDK_PREFIX/lib
+    ```
+
+#### Install Fastcapa
+
+1. Set the required environment variables.
+    ```
+    export RTE_SDK=$DPDK_HOME/share/dpdk/
+    export RTE_TARGET=x86_64-native-linuxapp-gcc
+    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$RDK_HOME
+    ```
+
+2. Build Fastcapa.  The resulting binary will be placed at `build/app/fastcapa`.
+    ```
+    cd incubator-metron/metron-sensors/fastcapa
+    make
+    ```
+
+Usage
+-----
+
+Follow these steps to run Fastcapa.
+
+1. Create a configuration file that at a minimum specifies your Kafka broker.  An example configuration file, `conf/fastcapa.conf`, is available that documents other useful parameters.
+    ```
+    [kafka-global]
+    metadata.broker.list = kafka-broker1:9092
+    ```
+    
+2. Bind the capture device.  This is only needed if the device is not already bound.  In this example, the device `enp9s0f0` with a PCI address of `09:00:0` is bound.  Use values specific to your environment.
+    ```
+    ifdown enp9s0f0
+    modprobe uio_pci_generic
+    $DPDK_HOME/sbin/dpdk-devbind --bind=uio_pci_generic "09:00.0"
+    ```
+
+3. Run Fastcapa.
+    ```
+    fastcapa -c 0x03 --huge-dir /mnt/huge_1GB -- -p 0x01 -t pcap -c /etc/fastcapa.conf
+    ```
+
+4. Terminate Fastcapa with `SIGINT` or by entering `CTRL-C`.  The probe will cleanly shut down all of the workers and allow the backlog of packets to drain.  To terminate the process without clearing the queue, send a `SIGKILL` or be entering `killall -9 fastcapa`.
+
+### Parameters
+
+Fastcapa accepts three sets of parameters.  
+
+1. Command-line parameters passed directly to DPDK's Environmental Abstraction Layer (EAL) 
+2. Command-line parameters that define how Fastcapa will interact with DPDK.  These parametera are separated on the command line by a  double-dash (`--`).
+3. A configuration file that define how Fastcapa interacts with Librdkafka.
+
+#### Environmental Abstraction Layer Parameters
+
+The most commonly used EAL parameter involves specifying which logical CPU cores should be used for processing.  This can be specified in any of the following ways.
+```
+  -c COREMASK         Hexadecimal bitmask of cores to run on
+  -l CORELIST         List of cores to run on
+                      The argument format is <c1>[-c2][,c3[-c4],...]
+                      where c1, c2, etc are core indexes between 0 and 128
+  --lcores COREMAP    Map lcore set to physical cpu set
+                      The argument format is
+                            '<lcores[@cpus]>[<,lcores[@cpus]>...]'
+                      lcores and cpus list are grouped by '(' and ')'
+                      Within the group, '-' is used for range separator,
+                      ',' is used for single number separator.
+                      '( )' can be omitted for single element group,
+                      '@' can be omitted if cpus and lcores have the same value                     
+```
+
+To get more information about other EAL parameters, run the following.
+```
+fastcapa -h
+```
+
+#### Fastcapa-Core Parameters
+
+| Name | Command | Description | Default |
+|--------------------------|-----------------|---------------------------------------------------------------------------------------------------------------------------|---------|
+| Port Mask | -p PORT_MASK | A bit mask identifying which ports to bind. | 0x01 |
+| Burst Size | -b BURST_SIZE | Maximum number of packets to receive at one time. | 32 |
+| Receive Descriptors | -r NB_RX_DESC | The number of descriptors for each receive queue (the size of the receive queue.)  Limited by the ethernet device in use. | 1024 |
+| Transmission Ring Size | -x TX_RING_SIZE | The size of each transmission ring.  This must be a power of 2. | 2048 |
+| Number Receive Queues | -q NB_RX_QUEUE | Number of receive queues to use for each port.  Limited by the ethernet device in use. | 2 |
+| Kafka Topic | -t KAFKA_TOPIC | The name of the Kafka topic. | pcap |
+| Configuration File | -c KAFKA_CONF | Path to a file containing configuration values. |  |
+| Stats | -s KAFKA_STATS | Appends performance metrics in the form of JSON strings to the specified file. |  |
+
+To get more information about the Fastcapa specific parameters, run the following.  Note that this puts the `-h` after the double-dash `--`.
+```
+fastcapa -- -h
+```
+
+#### Fastcapa-Kafka Configuration File
+
+The path to the configuration file is specified with the `-c` command line argument.  The file can contain any global or topic-specific, producer-focused [configuration values accepted by Librdkafka](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).  
+
+The configuration file is a `.ini`-like Glib configuration file.  The global configuration values should be placed under a `[kafka-global]` header and topic-specific values should be placed under `[kafka-topic]`.
+
+A minimally viable configuration file would only need to include the Kafka broker to connect to.
+```
+[kafka-global]
+metadata.broker.list = kafka-broker1:9092, kafka-broker2:9092
+```
+
+The configuration parameters that are important for either basic functioning or performance tuning of Fastcapa include the following.
+
+Global configuration values that should be located under the `[kafka-global]` header.
+
+| *Name* | *Description* | *Default* |
+|--------|---------------|-----------|
+| metadata.broker.list | Initial list of brokers as a CSV list of broker host or host:port |  |
+| client.id | Client identifier. |  |
+| queue.buffering.max.messages | Maximum number of messages allowed on the producer queue | 100000 |
+| queue.buffering.max.ms | Maximum time, in milliseconds, for buffering data on the producer queue | 1000 |
+| message.copy.max.bytes | Maximum size for message to be copied to buffer. Messages larger than this will be passed by reference (zero-copy) at the expense of larger iovecs. | 65535 |
+| batch.num.messages | Maximum number of messages batched in one MessageSet | 10000 | 
+| statistics.interval.ms | How often statistics are emitted; 0 = never | 0 |
+| compression.codec | Compression codec to use for compressing message sets; {none, gzip, snappy, lz4 } | none |
+
+
+Topic configuration values that should be located under the `[kafka-topic]` header.
+
+| *Name* | *Description* | *Default* |
+|--------|---------------|-----------|
+| compression.codec | Compression codec to use for compressing message sets; {none, gzip, snappy, lz4 } | none |
+| request.required.acks | How many acknowledgements the leader broker must receive from ISR brokers before responding to the request; { 0 = no ack, 1 = leader ack, -1 = all ISRs } | 1 |
+| message.timeout.ms | Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. | 300000 |
+| queue.buffering.max.kbytes | Maximum total message size sum allowed on the producer queue |  |
+
+### Output
+
+When running the probe some basic counters are output to stdout.  Of course during normal operation these values will be much larger.
+
+```
+     ------ in ------  --- queued --- ----- out ----- ---- drops ----
+[nic]               8               -               -               -
+[rx]                8               0                8                0
+[tx]                8               0                8                0
+[kaf]               8               1                7                0
+```
+
+* `[nic]` +  `in`:  The ethernet device is reporting that it has seen 8 packets.
+* `[rx]` + `in`: The receive workers have consumed 8 packets from the device.
+* `[rx]` + `out`: The receive workers have enqueued 8 packets onto the transmission rings.
+* `[rx]` + `drops`: If the transmission rings become full it will prevent the receive workers from enqueuing additional packets.  The excess packets are dropped.  This value will never decrease.
+* `[tx]` + `in`: The transmission workers have consumed 8 packets.
+* `[tx]` + `out`: The transmission workers have packaged 8 packets into Kafka messages. 
+* `[tx]` + `drops`: If the Kafka client library accepted fewer packets than expected.  This value can increase or decrease over time as additional packets are acknowledged by the Kafka client library at a later point in time.
+* `[kaf]` + `in`: The Kafka client library has received 8 packets.
+* `[kaf]` + `out`: A total of 7 packets has successfully reached Kafka. 
+* `[kaf]` + `queued`: There is 1 packet within the `rdkafka` queue waiting to be sent.
+
+How It Works
+------
+
+The probe leverages a poll-mode, burst-oriented mechanism to capture packets from a network interface and transmit them efficiently to a Kafka topic.  Each packet is wrapped within a single Kafka message and the current timestamp, as epoch microseconds in network byte order, is attached as the message's key.
+
+The probe leverages Receive Side Scaling (RSS), a feature provided by some ethernet devices that allows processing of received data to occur across multiple processes and logical cores.  It does this by running a hash function on each packet, whose value assigns the packet to one, of possibly many, receive queues.  The total number and size of these receive queues are limited by the ethernet device in use.  More capable ethernet devices will offer a greater number and greater sized receive queues.  
+
+ * Increasing the number of receive queues allows for greater parallelization of receive side processing.  
+ * Increasing the size of each receive queue can allow the probe to handle larger, temporary spikes of network packets that can often occur.
+
+A set of receive workers, each assigned to a unique logical core, are responsible for fetching packets from the receive queues.  There can only be one receive worker for each receive queue.  The receive workers continually poll the receive queues and attempt to fetch multiple packets on each request.  The maximum number of packets fetched in one request is known as the 'burst size'.  If the receive worker actually receives 'burst size' packets, then it is likely that the queue is under pressure and more packets are available.  In this case the worker immediately fetches another 'burst size' set of packets.  It repeats this process up to a fixed number of times while the queue is under pressure.
+
+The receive workers then enqueue the received packets into a fixed size ring buffer known as a transmit ring.  There is always one transmit ring for each receive queue.  A set of transmit workers then dequeue packets from the transmit rings.  There can be one or more transmit workers assigned to any single transmit ring.  Each transmit worker has its own unique connection to Kafka.
+
+* Increasing the number of transmit workers allows for greater parallelization when writing data to Kafka.
+* Increasing the size of the transmit rings allows the probe to better handle temporary interruptions and latency when writing to Kafka.
+
+After receiving the network packets from the transmit worker, the Kafka client library internally maintains its own send queue of messages.  Multiple threads are then responsible for managing this queue and creating batches of messages that are sent in bulk to a Kafka broker.  No control is exercised over this additional send queue and its worker threads, which can be an impediment to performance.  This is an opportunity for improvement that can be addressed as follow-on work.
+
+Performance
+-----------------
+Beyond tuning the parameters previously described, the following should be carefully considered to achieve maximum performance.
+
+### Kafka Partitions
+
+Parallelizing access to a topic in Kafka is achieved by defining multiple partitions.  The greater the number of partitions, the more parallelizable access to that topic becomes.  To achieve high throughput it is important to ensure that the Kafka topic in use has a large number of partitions, evenly distributed across each of the nodes in your Kafka cluster.
+
+The specific number of partitions needed will differ for each environment, but at least 128 partitions has been shown to significantly increase performance in some environments.
+
+### Physical Layout
+
+It is important to note the physical layout of the hardware when assigning worker cores to the probe.  The worker cores should be on the same NUMA node or socket as the ethernet device itself.  Assigning logical cores across NUMA boundaries can significantly impede performance.
+
+The following commands can help identify logical cores that are located on the same NUMA node or socket as the ethernet device itself.  These commands should be run when the device is still managed by the kernel itself; before binding the interface.
+```
+$ cat /sys/class/net/enp9s0f0/device/local_cpulist
+0-7,16-23
+```
+
+The following command can be used to better understand the physical layout of the CPU in relation to NUMA nodes.
+```
+$ lscpu
+...
+NUMA node0 CPU(s):     0-7,16-23
+NUMA node1 CPU(s):     8-15,24-31
+```
+
+In this example `enp9s0f0` is located on NUMA node 0 and is local to the logical cores 0-7 and 16-23.  You should choose worker cores from this list.
+
+### CPU Isolation
+
+Once you have chosen the logical cores to use that are local to the ethernet device, it also beneficial to isolate those cores so that the Linux kernel scheduler does not attempt to run tasks on them.  This can be done using the `isolcpus` kernel boot parameter.
+```
+isolcpus=0,1,2,3,4,5,6,7
+```
+
+### Device Limitations
+
+Check the output of running the probe to ensure that there are no device limitations that you are not aware of.  While you may have specified 16 receive queues on the command line, your device may not support that number.  This is especially true for the number of receive queues and descriptors.
+
+The following example shows the output when the number of receive descriptors requested is greater than what can be supported by the device.  In many cases the probe will not terminate, but will choose the maximum allowable value and continue.  This behavior is specific to the underlying device driver in use.
+```
+PMD: rte_enic_pmd: Rq 0 Scatter rx mode enabled
+PMD: rte_enic_pmd: Rq 0 Scatter rx mode not being used
+PMD: rte_enic_pmd: Number of rx_descs too high, adjusting to maximum
+PMD: rte_enic_pmd: Using 512 rx descriptors (sop 512, data 0)
+PMD: rte_enic_pmd: Rq 1 Scatter rx mode enabled
+PMD: rte_enic_pmd: Rq 1 Scatter rx mode not being used
+PMD: rte_enic_pmd: Number of rx_descs too high, adjusting to maximum
+PMD: rte_enic_pmd: Using 512 rx descriptors (sop 512, data 0)
+PMD: rte_enic_pmd: TX Queues - effective number of descs:32
+PMD: rte_enic_pmd: vNIC resources used:  wq 1 rq 4 cq 3 intr 0
+```
+
+### More Information
+
+More information on this topic can be found in [DPDK's Getting Started Guide](http://dpdk.org/doc/guides/linux_gsg/nic_perf_intel_platform.html).
+
+FAQs
+----
+
+### No free hugepages reported
+
+Problem: When executing `fastcapa` it fails with the following error message.
+```
+EAL: No free hugepages reported in hugepages-1048576kB
+PANIC in rte_eal_init():
+Cannot get hugepage information
+```
+
+Solution: This can occur if any process that has been allocated THPs crashes and fails to return the resources. 
+
+* Delete the THP files that are not in use.
+    ```
+    rm -f /mnt/huge_1GB/rtemap_*
+    ```
+      
+* If the first option does not work, re-mount the `hugetlbfs` file system.
+    ```
+    umount -a -t hugetlbfs
+    mount -a
+    ```
+
+### No ethernet ports detected
+
+Problem: When executing `fastcapa` it fails with the following error message.
+```
+EAL: Error - exiting with code: 1
+  Cause: No ethernet ports detected.
+```
+
+* Solution 1: The `uio_pci_generic` kernel module has not been loaded.
+```
+modprobe uio_pci_generic
+```
+
+* Solution 2: Ensure that the ethernet device is bound. Re-bind if necessary.
+```
+ dpdk-devbind --unbind "09:00.0"
+ dpdk-devbind --bind=uio_pci_generic "09:00.0"
+ dpdk-devbind --status
+```
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/conf/fastcapa.conf
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/conf/fastcapa.conf b/metron-sensors/fastcapa/conf/fastcapa.conf
new file mode 100644
index 0000000..df45fcc
--- /dev/null
+++ b/metron-sensors/fastcapa/conf/fastcapa.conf
@@ -0,0 +1,124 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+#
+# kafka global settings
+#
+[kafka-global]
+
+# Initial list of brokers as a CSV list of broker host or host:port.
+# Type: string
+metadata.broker.list = localhost:9092
+
+# Client identifier.
+# Type: string
+# Default: rdkafka
+client.id = fastcapa
+
+# Compression codec to use for compressing message sets. This is the default value
+# for all topics, may be overriden by the topic configuration property compression.codec.
+# Type: enum value { none, gzip, snappy, lz4 }
+# Default: none
+compression.codec = snappy
+
+# Maximum number of messages batched in one MessageSet. The total MessageSet size is
+# also limited by message.max.bytes. Increase for better compression.
+# Type: integer
+# Default: 10000
+batch.num.messages = 500000
+
+# Maximum number of messages allowed on the producer queue.
+# Type: integer
+# Default: 100000
+#queue.buffering.max.messages = 5000000
+
+# Maximum total message size sum allowed on the producer queue.
+# Type: integer
+#queue.buffering.max.kbytes = 2097151
+
+# Maximum time, in milliseconds, for buffering data on the producer queue.
+# Type: integer
+# Default: 1000
+#queue.buffering.max.ms = 5000
+
+# Maximum size for message to be copied to buffer. Messages larger than this will be
+# passed by reference (zero-copy) at the expense of larger iovecs.
+# Type: integer
+# Default: 65535
+#message.copy.max.bytes = 65535
+
+# Maximum transmit message size.
+# Type: integer
+# Default: 1000000
+#message.max.bytes = 1000000000
+
+# How many times to retry sending a failing MessageSet. Note: retrying may cause reordering.
+# Type: integer
+# Default: 2
+#message.send.max.retries = 5
+
+# The backoff time in milliseconds before retrying a message send.
+# Type: integer
+# Default: 100
+#retry.backoff.ms = 500
+
+# how often statistics are emitted; 0 = never
+# Statistics emit interval. The application also needs to register a stats callback
+# using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.
+# Type: integer
+# Default: 0
+#statistics.interval.ms = 5000
+
+# Protocol used to communicate with brokers.
+# Type: enum value { plaintext, ssl, sasl_plaintext, sasl_ssl }
+# Default: plaintext
+#security.protocol = plaintext
+
+# Timeout for network requests. 
+# Type: integer
+# Default: 60000
+socket.timeout.ms = 6000
+
+# Only provide delivery reports for failed messages.
+# Type: boolean
+# Default: false
+#delivery.report.only.error = false
+
+#
+# kafka topic settings
+#
+[kafka-topic]
+
+# This field indicates how many acknowledgements the leader broker must receive from ISR brokers
+# before responding to the request:
+#   0=Broker does not send any response/ack to client,
+#   1=Only the leader broker will need to ack the message,
+#  -1 or all=broker will block until message is committed by all in sync replicas (ISRs) or broker's in.sync.replicas setting before sending response.
+# Type: integer
+#request.required.acks = 1
+
+# Local message timeout. This value is only enforced locally and limits the time a produced message
+# waits for successful delivery. A time of 0 is infinite.
+# Type: integer
+# Default: 300000
+#message.timeout.ms = 300000
+
+# Report offset of produced message back to application. The application must be use the
+# dr_msg_cb to retrieve the offset from rd_kafka_message_t.offset.
+# Type: boolean
+# Default: false
+#produce.offset.report = false
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/conf/localhost.kafka
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/conf/localhost.kafka b/metron-sensors/fastcapa/conf/localhost.kafka
deleted file mode 100644
index b50c20e..0000000
--- a/metron-sensors/fastcapa/conf/localhost.kafka
+++ /dev/null
@@ -1,67 +0,0 @@
-#
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-
-#
-# kafka global settings
-#
-[kafka-global]
-
-# identifies the client to kafka
-client.id = pcap-localhost
-
-# initial list of kafka brokers
-metadata.broker.list = localhost:9092
-
-# max number of messages allowed on the producer queue
-queue.buffering.max.messages = 10000000
-
-# maximum time, in milliseconds, for buffering data on the producer queue
-queue.buffering.max.ms = 1000
-
-# compression codec = none, gzip or snappy
-compression.codec = snappy
-
-# maximum number of messages batched in one MessageSet (increase for better compression)
-batch.num.messages = 1000
-
-# max times to retry sending a failed message set
-message.send.max.retries = 10
-
-# backoff time before retrying a message send
-retry.backoff.ms = 250
-
-# how often statistics are emitted; 0 = never
-statistics.interval.ms = 0
-
-# only provide delivery reports for failed messages
-delivery.report.only.error = false
-
-#
-# kafka topic settings
-#
-[kafka-topic]
-
-# broker acks { 1 = leader ack, 0 = no acks, -1 = in sync replica ack }
-request.required.acks = 1
-
-# local message timeout. This value is only enforced locally and limits the time a
-# produced message waits for successful delivery. A time of 0 is infinite.
-message.timeout.ms = 300000
-
-# report offset of produced message back to application. The application must be
-# use the dr_msg_cb to retrieve the offset from rd_kafka_message_t.offset
-produce.offset.report = true

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/Makefile
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/Makefile b/metron-sensors/fastcapa/src/Makefile
index 9e7849d..e9bf877 100644
--- a/metron-sensors/fastcapa/src/Makefile
+++ b/metron-sensors/fastcapa/src/Makefile
@@ -46,7 +46,7 @@ ifeq ($(CONFIG_RTE_TOOLCHAIN_GCC),y)
 CFLAGS_main.o += -Wno-return-type
 endif
 
-#EXTRA_CFLAGS += -O3 -Wfatal-errors
-EXTRA_CFLAGS += -g -Wall
+EXTRA_CFLAGS += -O3 -Wall
+#EXTRA_CFLAGS += -g -Wall
 
 include $(RTE_SDK)/mk/rte.extapp.mk

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/args.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/args.c b/metron-sensors/fastcapa/src/args.c
index 04fb8d0..59d2748 100644
--- a/metron-sensors/fastcapa/src/args.c
+++ b/metron-sensors/fastcapa/src/args.c
@@ -26,19 +26,30 @@ typedef int bool;
 /*
  * Print usage information to the user.
  */
-void print_usage(const char* prgname)
+static void print_usage(void)
 {
-    printf("%s [EAL options] -- [APP options]\n"
-           "  -p PORTMASK     hex bitmask of ports to bind  [0x01]\n"
-           "  -t KAFKATOPIC   kafka topic                   [pcap]\n"
-           "  -c KAFKACONF    kafka config file             [conf/kafka.conf]\n",
-        prgname);
+    printf("fastcapa [EAL options] -- [APP options]\n"
+           "  -p PORT_MASK     bitmask of ports to bind                     [%s]\n"
+           "  -b BURST_SIZE    max packets to retrieve at a time            [%d]\n"
+           "  -r NB_RX_DESC    num of descriptors for receive ring          [%d]\n"
+           "  -x TX_RING_SIZE  size of tx rings (must be a power of 2)      [%d]\n"
+           "  -q NB_RX_QUEUE   num of receive queues for each device        [%d]\n"
+           "  -t KAFKA_TOPIC   name of the kafka topic                      [%s]\n"
+           "  -c KAFKA_CONF    file containing configs for kafka client         \n"
+           "  -s KAFKA_STATS   append kafka client stats to a file              \n"
+           "  -h               print this help message                          \n",
+        STR(DEFAULT_PORT_MASK), 
+        DEFAULT_BURST_SIZE, 
+        DEFAULT_NB_RX_DESC,
+        DEFAULT_TX_RING_SIZE,
+        DEFAULT_NB_RX_QUEUE,
+        STR(DEFAULT_KAFKA_TOPIC));
 }
 
 /*
  * Parse the 'portmask' command line argument.
  */
-int parse_portmask(const char* portmask)
+static int parse_portmask(const char* portmask)
 {
     char* end = NULL;
     unsigned long pm;
@@ -74,7 +85,7 @@ int parse_args(int argc, char** argv)
     int opt;
     char** argvopt;
     int option_index;
-    char* prgname = argv[0];
+    int nb_workers;
     static struct option lgopts[] = {
         { NULL, 0, 0, 0 }
     };
@@ -84,62 +95,167 @@ int parse_args(int argc, char** argv)
 
     // parse arguments to this application
     argvopt = argv;
-    while ((opt = getopt_long(argc, argvopt, "p:b:t:c:", lgopts, &option_index)) != EOF) {
+    while ((opt = getopt_long(argc, argvopt, "hp:b:t:c:r:q:s:x:", lgopts, &option_index)) != EOF) {
         switch (opt) {
 
-        // portmask
-        case 'p':
-            app.enabled_port_mask = parse_portmask(optarg);
-            if (app.enabled_port_mask == 0) {
-                printf("Error: Invalid portmask: '%s'\n", optarg);
-                print_usage(prgname);
+            // help
+            case 'h':
+                print_usage();
                 return -1;
-            }
-            break;
-
-        // kafka topic
-        case 't':
-            app.kafka_topic = strdup(optarg);
-            if (!valid(app.kafka_topic)) {
-                printf("Error: Invalid kafka topic: '%s'\n", optarg);
-                print_usage(prgname);
-                return -1;
-            }
-            break;
-
-        // kafka config path
-        case 'c':
-            app.kafka_config_path = strdup(optarg);
-            if (!valid(app.kafka_config_path) || !file_exists(app.kafka_config_path)) {
-                printf("Error: Invalid kafka config: '%s'\n", optarg);
-                print_usage(prgname);
-                return -1;
-            }
-            break;
-
-        default:
-            printf("Error: Invalid argument: '%s'\n", optarg);
-            print_usage(prgname);
-            return -1;
-        }
+
+            // burst size
+            case 'b':
+                app.burst_size = atoi(optarg);
+                printf("[ -b BURST_SIZE ] defined as %d \n", app.burst_size);
+
+                if(app.burst_size < 1 || app.burst_size > MAX_BURST_SIZE) {
+                    fprintf(stderr, "Invalid burst size; burst=%u must be in [1, %u]. \n", app.burst_size, MAX_BURST_SIZE);
+                    print_usage();
+                    return -1;
+                }
+                break;
+
+            // number of receive descriptors
+            case 'r':
+                app.nb_rx_desc = atoi(optarg);
+                printf("[ -r NB_RX_DESC ] defined as %d \n", app.nb_rx_desc);
+
+                if (app.nb_rx_desc < 1) {
+                    fprintf(stderr, "Invalid num of receive descriptors: '%s' \n", optarg);
+                    print_usage();
+                    return -1;
+                }
+                break;
+
+            // size of each transmit ring
+            case 'x':
+                app.tx_ring_size = atoi(optarg);
+                printf("[ -x TX_RING_SIZE ] defined as %d \n", app.tx_ring_size);
+
+                // must be a power of 2 and not 0
+                if (app.tx_ring_size == 0 || (app.tx_ring_size & (app.tx_ring_size - 1)) != 0) {
+                    fprintf(stderr, "Invalid tx ring size (must be power of 2): '%s' \n", optarg);
+                    print_usage();
+                    return -1;
+                }
+                break;
+
+            // number of receive queues for each device
+            case 'q':
+                app.nb_rx_queue = atoi(optarg);
+                printf("[ -q NB_RX_QUEUE ] defined as %d \n", app.nb_rx_queue);
+
+                if (app.nb_rx_queue < 1) {
+                    fprintf(stderr, "Invalid num of receive queues: '%s' \n", optarg);
+                    print_usage();
+                    return -1;
+                }
+                break;
+
+          // port mask
+          case 'p':
+              app.enabled_port_mask = parse_portmask(optarg);
+              printf("[ -p PORT_MASK ] defined as %d \n", app.enabled_port_mask);
+
+              if (app.enabled_port_mask == 0) {
+                  fprintf(stderr, "Invalid portmask: '%s'\n", optarg);
+                  print_usage();
+                  return -1;
+              }
+              break;
+
+          // kafka topic
+          case 't':
+              app.kafka_topic = strdup(optarg);
+              printf("[ -t KAFKA_TOPIC ] defined as %s \n", app.kafka_topic);
+
+              if (!valid(app.kafka_topic)) {
+                  printf("Invalid kafka topic: '%s'\n", optarg);
+                  print_usage();
+                  return -1;
+              }
+              break;
+
+          // kafka config path
+          case 'c':
+              app.kafka_config_path = strdup(optarg);
+              printf("[ -c KAFKA_CONFIG ] defined as %s \n", app.kafka_config_path);
+
+              if (!valid(app.kafka_config_path) || !file_exists(app.kafka_config_path)) {
+                  fprintf(stderr, "Invalid kafka config: '%s'\n", optarg);
+                  print_usage();
+                  return -1;
+              }
+              break;
+
+          // kafka stats path
+          case 's':
+              app.kafka_stats_path = strdup(optarg);
+              printf("[ -s KAFKA_STATS ] defined as %s \n", app.kafka_stats_path);
+              break;
+
+          default:
+              print_usage();
+              return -1;
+          }
     }
 
-    // check for required command-line arguments
+    // default PORT_MASK
     if (app.enabled_port_mask == 0) {
-        printf("Error: Missing -p PORTMASK\n");
-        print_usage(prgname);
-        return -1;
+        printf("[ -p PORT_MASK ] undefined; defaulting to %s \n", STR(DEFAULT_PORT_MASK));
+        app.enabled_port_mask = DEFAULT_PORT_MASK;
     }
 
+    // default KAFKA_TOPIC
     if (!valid(app.kafka_topic)) {
-        printf("Error: Missing -t KAFKATOPIC\n");
-        print_usage(prgname);
-        return -1;
+        printf("[ -t KAFKA_TOPIC ] undefined; defaulting to %s \n", STR(DEFAULT_KAFKA_TOPIC));
+        app.kafka_topic = STR(DEFAULT_KAFKA_TOPIC);
     }
 
-    argv[optind - 1] = prgname;
+    // default BURST_SIZE 
+    if (app.burst_size == 0) {
+        printf("[ -b BURST_SIZE ] undefined; defaulting to %d \n", DEFAULT_BURST_SIZE);
+        app.burst_size = DEFAULT_BURST_SIZE;
+    }
+
+    // default NB_RX_DESC
+    if (app.nb_rx_desc == 0) {
+        printf("[ -r NB_RX_DESC ] undefined; defaulting to %d \n", DEFAULT_NB_RX_DESC);
+        app.nb_rx_desc = DEFAULT_NB_RX_DESC;
+    }
+
+    // default NB_RX_QUEUE
+    if (app.nb_rx_queue == 0) {
+        printf("[ -q NB_RX_QUEUE ] undefined; defaulting to %d \n", DEFAULT_NB_RX_QUEUE);
+        app.nb_rx_queue = DEFAULT_NB_RX_QUEUE;
+    }
+
+    // default TX_RING_SIZE
+    if (app.tx_ring_size == 0) {
+        printf("[ -x TX_RING_SIZE ] undefined; defaulting to %u \n", DEFAULT_TX_RING_SIZE);
+        app.tx_ring_size = DEFAULT_TX_RING_SIZE;
+    }
+
+    // check number of ethernet devices
+    if (rte_eth_dev_count() == 0) {
+         rte_exit(EXIT_FAILURE, "No ethernet ports detected.\n");
+     }
+
+    // check number of workers
+    nb_workers = rte_lcore_count() - 1;
+    if (nb_workers < 1) {
+        rte_exit(EXIT_FAILURE, "Minimum 2 logical cores required. \n");
+    }
+
+    // need at least 1 worker for each receive queue
+    if(nb_workers < app.nb_rx_queue) {
+        rte_exit(EXIT_FAILURE, "Minimum 1 worker per receive queue; workers=%u rx_queues=%u. \n", 
+            nb_workers, app.nb_rx_queue);
+    }
 
     // reset getopt lib
     optind = 0;
+
     return 0;
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/args.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/args.h b/metron-sensors/fastcapa/src/args.h
index 36dde29..352b221 100644
--- a/metron-sensors/fastcapa/src/args.h
+++ b/metron-sensors/fastcapa/src/args.h
@@ -60,6 +60,19 @@
 #include <rte_lpm.h>
 #include <rte_string_fns.h>
 
+#define DEFAULT_BURST_SIZE 32
+#define DEFAULT_PORT_MASK 0x01
+#define DEFAULT_KAFKA_TOPIC pcap
+#define DEFAULT_NB_RX_QUEUE 2
+#define DEFAULT_NB_RX_DESC 1024
+#define DEFAULT_TX_RING_SIZE 2048
+#define DEFAULT_KAFKA_STATS_PATH 0
+
+#define MAX_BURST_SIZE 1024
+
+#define STR_EXPAND(tok) #tok
+#define STR(tok) STR_EXPAND(tok)
+
 /*
  * Logging definitions
  */
@@ -79,9 +92,31 @@
  * Application configuration parameters.
  */
 struct app_params {
+
+    /* The number of receive descriptors to allocate for the receive ring. */
+    uint16_t nb_rx_desc;
+
+    /* The number of receive queues to set up for each ethernet device. */
+    uint16_t nb_rx_queue;
+
+    /* The size of the transmit ring (must be a power of 2). */
+    uint16_t tx_ring_size;
+
+    /* The maximum number of packets to retrieve at a time. */
+    uint16_t burst_size;
+
+    /* Defines which ports packets will be consumed from. */
     uint32_t enabled_port_mask;
-    char* kafka_topic;
+
+    /* The name of the Kafka topic that packet data is sent to. */
+    const char* kafka_topic;
+
+    /* A file containing configuration values for the Kafka client. */
     char* kafka_config_path;
+
+    /* A file to which the Kafka stats are appended to. */
+    char* kafka_stats_path;
+
 } __rte_cache_aligned;
 
 /*
@@ -89,19 +124,10 @@ struct app_params {
  */
 struct app_params app;
 
-/*
- * Print usage information to the user.
- */
-void print_usage(const char* prgname);
-
-/*
- * Parse the 'portmask' command line argument.
- */
-int parse_portmask(const char* portmask);
-
 /**
  * Parse the command line arguments passed to the application.
  */
 int parse_args(int argc, char** argv);
 
 #endif
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/kafka.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/kafka.c b/metron-sensors/fastcapa/src/kafka.c
index 608e308..b844f3f 100644
--- a/metron-sensors/fastcapa/src/kafka.c
+++ b/metron-sensors/fastcapa/src/kafka.c
@@ -21,11 +21,113 @@
 #define POLL_TIMEOUT_MS 1000
 
 /*
- * data structures required for the kafka client
+ * Passed to all callback functions to help identify the connection.
  */
-static rd_kafka_t** kaf_h;
-static rd_kafka_topic_t** kaf_top_h;
-static int num_conns;
+struct opaque {
+    int conn_id;  
+};
+
+/*
+ * Data structures required for the kafka client
+ */
+static rd_kafka_t **kaf_h;
+static rd_kafka_topic_t **kaf_top_h;
+static unsigned num_conns;
+static FILE *stats_fd;
+static struct app_stats *kaf_conn_stats;
+static struct opaque *kaf_opaque;
+static uint64_t *kaf_keys;
+
+/*
+ * A callback executed when an error occurs within the kafka client
+ */
+static void kaf_error_cb (rd_kafka_t *rk, int err, const char *reason, void* UNUSED(opaque))
+{
+    LOG_ERROR(USER1, "kafka client unexpected error; conn=%s, error=%s [%s] \n", 
+        rd_kafka_name(rk), rd_kafka_err2str(err), reason);
+}
+
+/*
+ * A callback executed when a broker throttles the producer
+ */
+static void kaf_throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker_id, int throttle_time_ms, void* UNUSED(opaque))
+{
+    LOG_ERROR(USER1, "kafka client throttle event; conn=%s, time=%dms broker=%s broker_id=%"PRId32" \n", 
+        rd_kafka_name(rk), throttle_time_ms, broker_name, broker_id);
+}
+
+/*
+ * A callback executed on a fixed frequency (defined by `statistics.interval.ms`) 
+ * that provides detailed performance statistics
+ */
+static int kaf_stats_cb(rd_kafka_t *rk, char *json, size_t UNUSED(json_len), void *opaque) 
+{
+    int rc;
+    struct opaque *data = (struct opaque*) opaque;
+    int conn_id = data->conn_id;   
+
+    // update queue depth of this kafka connection
+    kaf_conn_stats[conn_id].depth = rd_kafka_outq_len(rk);
+
+    // write json to the stats file
+    if(NULL != stats_fd) {
+        rc = fprintf(stats_fd, "{ \"conn_id\": \"%u\", \"conn_name\": \"%s\", \"stats\": %s }\n", conn_id, rd_kafka_name(rk), json);
+        if(rc < 0) {
+            LOG_ERROR(USER1, "Unable to append to stats file \n");
+            return rc;
+        }
+        fflush(stats_fd);
+    }
+    
+    // 0 ensures the json pointer is immediately freed
+    return 0;
+}
+
+/*
+ * A callback that is called once for each message when it has been successfully
+ * produced.
+ */
+static void kaf_message_delivered_cb (rd_kafka_t *UNUSED(rk), const rd_kafka_message_t *UNUSED(rkmessage), void *opaque) 
+{
+    struct opaque *data = (struct opaque*) opaque;
+    int conn_id = data->conn_id;   
+
+    kaf_conn_stats[conn_id].out += 1;
+}
+
+/*
+ * Opens the file used to persist the stats coming out of the kafka client
+ */
+static int open_stats_file(char *filename)
+{
+    int rc;
+
+    stats_fd = fopen(filename, "a");
+    if(NULL == stats_fd) {
+        LOG_ERROR(USER1, "Unable to open stats file: file=%s, error=%s \n", filename, strerror(errno));
+        return -1;
+    }
+
+    // mark the file
+    rc = fprintf(stats_fd, "{} \n");
+    if(rc < 0) {
+       LOG_ERROR(USER1, "Unable to append to stats file \n");
+       return rc;
+    } 
+
+    fflush(stats_fd);
+    return 0;
+}
+
+/*
+ * Closes the file used to persist the kafka client stats.
+ */
+static void close_stats_file(void) 
+{
+    if(NULL != stats_fd) {
+        fclose(stats_fd);
+    }	
+}
 
 /**
  * A callback executed for each global Kafka option.
@@ -109,17 +211,35 @@ void kaf_init(int num_of_conns)
     int i;
     char errstr[512];
 
+    // open the file to which the kafka stats are appended
+    if(NULL != app.kafka_stats_path) {
+        LOG_INFO(USER1, "Appending Kafka client stats to '%s' \n", app.kafka_stats_path);
+        open_stats_file(app.kafka_stats_path);
+    }  
+
     // the number of connections to maintain
     num_conns = num_of_conns;
 
     // create kafka resources for each consumer
     kaf_h = calloc(num_of_conns, sizeof(rd_kafka_t*));
     kaf_top_h = calloc(num_of_conns, sizeof(rd_kafka_topic_t*));
-
+    kaf_conn_stats = calloc(num_of_conns, sizeof(struct app_stats));
+    kaf_opaque = calloc(num_of_conns, sizeof(struct opaque));
+    kaf_keys = calloc(num_of_conns, sizeof(uint64_t));
+    
     for (i = 0; i < num_of_conns; i++) {
 
-        // configure kafka connection; values parsed from kafka config file
+        // passed to each callback function to identify the kafka connection
+        kaf_opaque[i] = (struct opaque) { .conn_id = i };
+
         rd_kafka_conf_t* kaf_conf = rd_kafka_conf_new();
+        rd_kafka_conf_set_opaque(kaf_conf, (void *) &kaf_opaque[i]);
+        rd_kafka_conf_set_error_cb(kaf_conf, kaf_error_cb);
+        rd_kafka_conf_set_throttle_cb(kaf_conf, kaf_throttle_cb);
+        rd_kafka_conf_set_stats_cb(kaf_conf, kaf_stats_cb);
+        rd_kafka_conf_set_dr_msg_cb(kaf_conf, kaf_message_delivered_cb);
+
+        // configure kafka connection; values parsed from kafka config file
         if (NULL != app.kafka_config_path) {
             parse_kafka_config(app.kafka_config_path, "kafka-global", kaf_global_option, (void*)kaf_conf);
         }
@@ -144,23 +264,72 @@ void kaf_init(int num_of_conns)
     }
 }
 
+/*
+ * Executes polling across all of the kafka client connections.  Ensures that any queued
+ * callbacks are served.
+ */
+void kaf_poll(void) 
+{
+    unsigned i;
+    for (i = 0; i < num_conns; i++) {
+        rd_kafka_poll(kaf_h[i], POLL_TIMEOUT_MS);
+    }
+}
+
+/**
+ * Retrieves a summary of statistics across all of the kafka client connections.
+ */
+int kaf_stats(struct app_stats *stats)
+{
+    unsigned i;
+    uint64_t in, out, depth, drops;
+
+    in = out = depth = drops = 0;
+    for (i = 0; i < num_conns; i++) {
+        in += kaf_conn_stats[i].in;
+        out += kaf_conn_stats[i].out;
+        depth += kaf_conn_stats[i].depth;
+        drops += kaf_conn_stats[i].drops;
+    }
+
+    stats->in = in;
+    stats->out = out;
+    stats->depth = depth;
+    stats->drops = drops;
+
+    return 0;
+}
+
 /**
  * Closes the pool of Kafka connections.
  */
 void kaf_close(void)
 {
-    int i;
+    unsigned i;
+
+    LOG_INFO(USER1, "Closing all Kafka connections \n");
     for (i = 0; i < num_conns; i++) {
+       LOG_INFO(USER1, "'%u' message(s) queued on %s \n", rd_kafka_outq_len(kaf_h[i]), rd_kafka_name(kaf_h[i]));
+    }
+
+    for (i = 0; i < num_conns; i++) {
+
         // wait for messages to be delivered
         while (rd_kafka_outq_len(kaf_h[i]) > 0) {
-            LOG_INFO(USER1, "waiting for %d messages to clear on conn [%i/%i]",
-                rd_kafka_outq_len(kaf_h[i]), i + 1, num_conns);
+            LOG_INFO(USER1, "Waiting for '%u' message(s) on %s \n", rd_kafka_outq_len(kaf_h[i]), rd_kafka_name(kaf_h[i]));
             rd_kafka_poll(kaf_h[i], POLL_TIMEOUT_MS);
         }
 
+        LOG_INFO(USER1, "All messages cleared on %s \n", rd_kafka_name(kaf_h[i]));
+        rd_kafka_flush(kaf_h[i], POLL_TIMEOUT_MS);
         rd_kafka_topic_destroy(kaf_top_h[i]);
         rd_kafka_destroy(kaf_h[i]);
     }
+
+    free(kaf_conn_stats);
+    free(kaf_opaque);
+    free(kaf_keys);
+    close_stats_file();
 }
 
 /**
@@ -176,49 +345,39 @@ static uint64_t current_time(void)
 /**
  * Publish a set of packets to a kafka topic.
  */
-int kaf_send(struct rte_mbuf* data, int pkt_count, int conn_id)
+int kaf_send(struct rte_mbuf* pkts[], int pkt_count, int conn_id)
 {
     // unassigned partition
     int partition = RD_KAFKA_PARTITION_UA;
     int i;
     int pkts_sent = 0;
-    int drops;
     rd_kafka_message_t kaf_msgs[pkt_count];
 
-    // TODO: ensure that librdkafka cleans this up for us
-    uint64_t *now = malloc(sizeof(uint64_t));
-
-    // the current time in microseconds from the epoch (in big-endian aka network
-    // byte order) is added as a message key before being sent to kafka
-    *now = htobe64(current_time());
-
     // find the topic connection based on the conn_id
     rd_kafka_topic_t* kaf_topic = kaf_top_h[conn_id];
 
+    // current time in epoch microseconds from (big-endian aka network byte order) 
+    // is added as a message key before being sent to kafka
+    kaf_keys[conn_id] = htobe64(current_time());
+    
     // create the batch message for kafka
     for (i = 0; i < pkt_count; i++) {
         kaf_msgs[i].err = 0;
         kaf_msgs[i].rkt = kaf_topic;
         kaf_msgs[i].partition = partition;
-        kaf_msgs[i].payload = rte_ctrlmbuf_data(&data[i]);
-        kaf_msgs[i].len = rte_ctrlmbuf_len(&data[i]);
-        kaf_msgs[i].key = (void*) now;
+        kaf_msgs[i].payload = rte_ctrlmbuf_data(pkts[i]);
+        kaf_msgs[i].len = rte_ctrlmbuf_len(pkts[i]);
+        kaf_msgs[i].key = (void*) &kaf_keys[conn_id];
         kaf_msgs[i].key_len = sizeof(uint64_t);
         kaf_msgs[i].offset = 0;
     }
 
     // hand all of the messages off to kafka
-    pkts_sent = rd_kafka_produce_batch(kaf_topic, partition, RD_KAFKA_MSG_F_COPY, kaf_msgs, pkt_count);
-
-    // did we drop packets?
-    drops = pkt_count - pkts_sent;
-    if (drops > 0) {
-        for (i = 0; i < pkt_count; i++) {
-            if (!kaf_msgs[i].err) {
-                LOG_ERROR(USER1, "'%d' packets dropped, first error: %s", drops, (char*)kaf_msgs[i].payload);
-            }
-        }
-    }
+    pkts_sent = rd_kafka_produce_batch(kaf_topic, partition, 0, kaf_msgs, pkt_count);
+
+    // update stats
+    kaf_conn_stats[conn_id].in += pkt_count;
+    kaf_conn_stats[conn_id].drops += (pkt_count - pkts_sent);
 
     return pkts_sent;
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/kafka.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/kafka.h b/metron-sensors/fastcapa/src/kafka.h
index 8b6bc78..ab62ad7 100644
--- a/metron-sensors/fastcapa/src/kafka.h
+++ b/metron-sensors/fastcapa/src/kafka.h
@@ -19,23 +19,43 @@
 #ifndef METRON_KAFKA_H
 #define METRON_KAFKA_H
 
+#include <stdio.h>
 #include <string.h>
 #include <sys/time.h>
 #include <endian.h>
 #include <librdkafka/rdkafka.h>
 #include <rte_common.h>
 #include <rte_mbuf.h>
+
 #include "args.h"
+#include "types.h"
+
+#ifdef __GNUC__
+#  define UNUSED(x) UNUSED_ ## x __attribute__((__unused__))
+#else
+#  define UNUSED(x) UNUSED_ ## x
+#endif
 
 /**
  * Initializes a pool of Kafka connections.
- */
+*/
 void kaf_init(int num_of_conns);
 
 /**
  * Publish a set of packets to a kafka topic.
  */
-int kaf_send(struct rte_mbuf* data, int num_to_send, int conn_id);
+int kaf_send(struct rte_mbuf* data[], int num_to_send, int conn_id);
+
+/*
+ * Executes polling across all of the kafka client connections.  Ensures that any queued
+ * callbacks are served.
+ */
+void kaf_poll(void);
+
+/**
+ * Retrieves a summary of statistics across all of the kafka client connections.
+ */
+int kaf_stats(struct app_stats *stats);
 
 /**
  * Closes the pool of Kafka connections.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/main.c
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/main.c b/metron-sensors/fastcapa/src/main.c
index 87a385e..c4c5123 100644
--- a/metron-sensors/fastcapa/src/main.c
+++ b/metron-sensors/fastcapa/src/main.c
@@ -22,42 +22,57 @@
  * Initialize a port using global settings and with the rx buffers
  * coming from the mbuf_pool passed as parameter
  */
-static inline int init_port(uint8_t port, struct rte_mempool* mbuf_pool)
+static inline int init_port(const uint8_t port, struct rte_mempool* mbuf_pool)
 {
     struct rte_eth_conf port_conf = port_conf_default;
     int retval;
     uint16_t q;
-    const uint16_t rxRings = 1;
-    const uint16_t txRings = 1;
-    int socket = rte_eth_dev_socket_id(port);
+    int retry = 5;
+    const uint16_t tx_queues = 1;
+    int socket;
+    struct rte_eth_dev_info dev_info;
 
     if (port >= rte_eth_dev_count()) {
-        rte_exit(EXIT_FAILURE, "Port %" PRIu8 " does not exist; only %u known port(s)",
-            port, rte_eth_dev_count());
+        rte_exit(EXIT_FAILURE, "Port does not exist; port=%u \n", port);
         return -1;
     }
 
-    retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
+    // check that the number of RX queues does not exceed what is supported by the device
+    rte_eth_dev_info_get(port, &dev_info);
+    if (app.nb_rx_queue > dev_info.max_rx_queues) {
+        rte_exit(EXIT_FAILURE, "Too many RX queues for device; port=%u, rx_queues=%u, max_queues=%u \n", 
+            port, app.nb_rx_queue, dev_info.max_rx_queues);
+        return -EINVAL;
+    }
+
+    // check that the number of TX queues does not exceed what is supported by the device
+    if (tx_queues > dev_info.max_tx_queues) {
+        rte_exit(EXIT_FAILURE, "Too many TX queues for device; port=%u, tx_queues=%u, max_queues=%u \n", 
+            port, tx_queues, dev_info.max_tx_queues);
+        return -EINVAL;
+    }
+
+    retval = rte_eth_dev_configure(port, app.nb_rx_queue, tx_queues, &port_conf);
     if (retval != 0) {
-        rte_exit(EXIT_FAILURE, "Unable to configure port %" PRIu8 "\n", port);
+        rte_exit(EXIT_FAILURE, "Cannot configure device; port=%u, err=%s \n", port, strerror(-retval));
         return retval;
     }
 
-    // setup the receive rings
-    for (q = 0; q < rxRings; q++) {
-        retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE, socket, NULL, mbuf_pool);
-        if (retval < 0) {
-            rte_exit(EXIT_FAILURE, "Unable to setup rx queue on port %" PRIu8 "\n", port);
+    // create the receive queues
+    socket = rte_eth_dev_socket_id(port);
+    for (q = 0; q < app.nb_rx_queue; q++) {
+        retval = rte_eth_rx_queue_setup(port, q, app.nb_rx_desc, socket, NULL, mbuf_pool);
+        if (retval != 0) {
+            rte_exit(EXIT_FAILURE, "Cannot setup RX queue; port=%u, err=%s \n", port, strerror(-retval));
             return retval;
         }
     }
 
-    // setup the transmit rings
-    for (q = 0; q < txRings; q++) {
-        retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE, socket, NULL);
-
-        if (retval < 0) {
-            rte_exit(EXIT_FAILURE, "Unable to setup rx queue on port %" PRIu8 "\n", port);
+    // create the transmit queues - at least one TX queue must be setup even though we don't use it
+    for (q = 0; q < tx_queues; q++) {
+        retval = rte_eth_tx_queue_setup(port, q, TX_QUEUE_SIZE, socket, NULL);
+        if (retval != 0) {
+            rte_exit(EXIT_FAILURE, "Cannot setup TX queue; port=%u, err=%s \n", port, strerror(-retval));
             return retval;
         }
     }
@@ -65,65 +80,134 @@ static inline int init_port(uint8_t port, struct rte_mempool* mbuf_pool)
     // start the receive and transmit units on the device
     retval = rte_eth_dev_start(port);
     if (retval < 0) {
-        rte_exit(EXIT_FAILURE, "Unable to start device on port %" PRIu8 "\n", port);
+        rte_exit(EXIT_FAILURE, "Cannot start device; port=%u, err=%s \n", port, strerror(-retval));
         return retval;
     }
 
     // retrieve information about the device
     struct rte_eth_link link;
-    rte_eth_link_get_nowait(port, &link);
-    if (!link.link_status) {
-        sleep(1);
+    do {
         rte_eth_link_get_nowait(port, &link);
-    }
+
+    } while (retry-- > 0 && !link.link_status && !sleep(1));
 
     // if still no link information, must be down
     if (!link.link_status) {
-        rte_exit(EXIT_FAILURE, "Link down on port %" PRIu8 "\n", port);
+        rte_exit(EXIT_FAILURE, "Link down; port=%u \n", port);
         return 0;
     }
 
+    // enable promisc mode
+    rte_eth_promiscuous_enable(port);
+
     // print diagnostics
     struct ether_addr addr;
     rte_eth_macaddr_get(port, &addr);
-    LOG_INFO(USER1, "Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+    LOG_INFO(USER1, "Device setup successfully; port=%u, mac=%02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
         (unsigned)port,
         addr.addr_bytes[0], addr.addr_bytes[1],
         addr.addr_bytes[2], addr.addr_bytes[3],
         addr.addr_bytes[4], addr.addr_bytes[5]);
 
-    // enable promisc mode
-    rte_eth_promiscuous_enable(port);
     return 0;
 }
 
-static void quit_workers(struct rte_distributor* d, struct rte_mempool* p, unsigned num_workers)
+static void print_stats(struct rx_worker_params *rx_params, unsigned nb_rx_workers, struct tx_worker_params *tx_params, unsigned nb_tx_workers)
 {
+    struct rte_eth_stats eth_stats;
     unsigned i;
-    struct rte_mbuf* bufs[num_workers];
-    rte_mempool_get_bulk(p, (void*)bufs, num_workers);
+    uint64_t in, out, depth, drops;
+    struct app_stats stats;
+
+    // header
+    printf("\n\n     %15s %15s %15s %15s \n", " ----- in -----", " --- queued ---", "----- out -----", "---- drops ----");
+
+    // summarize stats from each port
+    in = 0;
+    for (i = 0; i < rte_eth_dev_count(); i++) {
+        rte_eth_stats_get(i, &eth_stats);
+        in += eth_stats.ipackets;
+    }
+    printf("[nic] %15" PRIu64 " %15s %15s %15s \n", in, "-", "-", "-");
+
+    // summarize receive; from network to receive queues
+    in = out = depth = drops = 0;
+    for (i = 0; i < nb_rx_workers; i++) {
+        in += rx_params[i].stats.in;
+        out += rx_params[i].stats.out;
+        depth += rx_params[i].stats.depth;
+        drops += rx_params[i].stats.drops;
+    }
+    printf("[rx]  %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", out, drops);
+
+    // summarize transmit; from receive queues to transmit rings
+    in = out = depth = 0;
+    for (i = 0; i < nb_tx_workers; i++) {
+        in += tx_params[i].stats.in;
+        out += tx_params[i].stats.out;
+        depth += tx_params[i].stats.depth;
+    }
+    printf("[tx]  %15" PRIu64 " %15s %15" PRIu64 " %15" PRIu64 "\n", in, "-", out, in - out);
+
+    // summarize push to kafka; from transmit rings to librdkafka
+    kaf_stats(&stats);
+    printf("[kaf] %15" PRIu64 " %15" PRIu64 " %15" PRIu64 " %15" PRIu64 "\n", stats.in, stats.depth, stats.out, stats.drops);
 
-    for (i = 0; i < num_workers; i++) {
-        bufs[i]->hash.rss = i << 1;
+    // summarize any errors on the ports
+    for (i = 0; i < rte_eth_dev_count(); i++) {
+        rte_eth_stats_get(i, &eth_stats);
+
+        if(eth_stats.ierrors > 0 || eth_stats.oerrors > 0 || eth_stats.rx_nombuf > 0) {
+            printf("\nErrors: Port %u \n", i);
+            printf(" - In Errs:   %" PRIu64 "\n", eth_stats.ierrors);
+            printf(" - Out Errs:  %" PRIu64 "\n", eth_stats.oerrors);
+            printf(" - Mbuf Errs: %" PRIu64 "\n", eth_stats.rx_nombuf);
+        }
+    }
+}
+
+/*
+ * Handles interrupt signals.
+ */
+static void sig_handler(int sig_num)
+{
+    LOG_INFO(USER1, "Exiting on signal '%d'\n", sig_num);
+
+    // set quit flag for rx thread to exit
+    quit_signal = 1;
+}
+
+static int monitor_workers(struct rx_worker_params *rx_params, unsigned nb_rx_workers, struct tx_worker_params *tx_params, unsigned nb_tx_workers)
+{
+    LOG_INFO(USER1, "Starting to monitor workers; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id());
+    while (!quit_signal) {
+	kaf_poll();
+        print_stats(rx_params, nb_rx_workers, tx_params, nb_tx_workers);
+        sleep(5);
     }
 
-    rte_distributor_process(d, bufs, num_workers);
-    rte_mempool_put_bulk(p, (void*)bufs, num_workers);
+    LOG_INFO(USER1, "Finished monitoring workers; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id());
+    return 0;
 }
 
 /**
- * Master distribution logic that receives a packet and distributes it to a
- * worker.
+ * Process packets from a single queue.
  */
-static int receive_packets(struct lcore_params* p)
+static int receive_worker(struct rx_worker_params* params)
 {
-    struct rte_distributor* d = p->d;
-    struct rte_mempool* mem_pool = p->mem_pool;
     const uint8_t nb_ports = rte_eth_dev_count();
-    const int socket_id = rte_socket_id();
+    const unsigned socket_id = rte_socket_id();
+    const uint16_t burst_size = app.burst_size;
+    const uint16_t queue_id = params->queue_id;
+    struct rte_ring *ring = params->output_ring;
+    int i, dev_socket_id;
     uint8_t port;
+    struct rte_mbuf* pkts[MAX_BURST_SIZE];
+    const int attempts = MAX_BURST_SIZE / burst_size;
+
+    LOG_INFO(USER1, "Receive worker started; core=%u, socket=%u, queue=%u attempts=%d \n", rte_lcore_id(), socket_id, queue_id, attempts);
 
-    // check for cross-socket communication
+    // validate each port
     for (port = 0; port < nb_ports; port++) {
 
         // skip ports that are not enabled
@@ -131,14 +215,15 @@ static int receive_packets(struct lcore_params* p)
             continue;
         }
 
-        if (rte_eth_dev_socket_id(port) > 0 && rte_eth_dev_socket_id(port) != socket_id) {
-            LOG_WARN(USER1, "Warning: Port %u on different socket from thread; performance will suffer\n", port);
+        // check for cross-socket communication
+        dev_socket_id = rte_eth_dev_socket_id(port);
+        if (dev_socket_id >= 0 && ((unsigned) dev_socket_id) != socket_id) {
+            LOG_WARN(USER1, "Warning: Port %u on different socket from worker; performance will suffer\n", port);
         }
     }
 
-    LOG_INFO(USER1, "Core %u doing packet receive and distribution.\n", rte_lcore_id());
     port = 0;
-    while (!quit_signal_rx) {
+    while (!quit_signal) {
 
         // skip to the next enabled port
         if ((app.enabled_port_mask & (1 << port)) == 0) {
@@ -148,89 +233,72 @@ static int receive_packets(struct lcore_params* p)
             continue;
         }
 
-        // receive a 'burst' of many packets
-        struct rte_mbuf* bufs[BURST_SIZE * 2];
-        const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs, BURST_SIZE);
-        app_stats.rx.received_pkts += nb_rx;
-
-        // distribute the packets amongst all workers
-        rte_distributor_process(d, bufs, nb_rx);
-
-        // track packets completed by the workers
-        const uint16_t nb_ret = rte_distributor_returned_pkts(d, bufs, BURST_SIZE * 2);
-        app_stats.rx.enqueued_pkts += nb_ret;
-        if (unlikely(nb_ret == 0)) {
-            continue;
+        // receive a 'burst' of packets. if get back the max number requested, then there 
+        // are likely more packets waiting. immediately go back and grab some.
+        i = 0;
+        uint16_t nb_in = 0, nb_in_last = 0;
+        do {
+            nb_in_last = rte_eth_rx_burst(port, queue_id, &pkts[nb_in], burst_size);
+            nb_in += nb_in_last;
+
+        } while (++i < attempts && nb_in_last == burst_size);
+        params->stats.in += nb_in;
+
+        // add each packet to the ring buffer
+        if(likely(nb_in) > 0) {
+          const uint16_t nb_out = rte_ring_enqueue_burst(ring, (void *) pkts, nb_in);
+          params->stats.out += nb_out;
+          params->stats.drops += (nb_in - nb_out);
         }
-
+       
+        // clean-up the packet buffer 
+        for (i = 0; i < nb_in; i++) {
+          rte_pktmbuf_free(pkts[i]);
+        }
+        
         // wrap-around to the first port
         if (++port == nb_ports) {
             port = 0;
         }
     }
 
-    // flush distributor process
-    rte_distributor_process(d, NULL, 0);
-    rte_distributor_flush(d);
-
-    // notify workers that it is quitting time
-    quit_signal = 1;
-    quit_workers(d, mem_pool, p->num_workers);
-
+    LOG_INFO(USER1, "Receive worker finished; core=%u, socket=%u, queue=%u \n", rte_lcore_id(), socket_id, queue_id);
     return 0;
 }
 
-/*
- * Send packets to a Kafka broker.
+/**
+ *
  */
-static int send_packets(struct lcore_params* p)
+static int transmit_worker(struct tx_worker_params *params)
 {
-    struct rte_distributor* d = p->d;
-    const unsigned id = p->worker_id;
-    struct rte_mbuf* buf = NULL;
+    unsigned i, nb_in, nb_out;
+    const uint16_t burst_size = params->burst_size;
+    struct rte_ring *ring = params->input_ring;
+    const int kafka_id = params->kafka_id;
 
-    LOG_INFO(USER1, "Core %u is a worker core.\n", rte_lcore_id());
+    LOG_INFO(USER1, "Transmit worker started; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id());
     while (!quit_signal) {
-        buf = rte_distributor_get_pkt(d, id, buf);
-
-        LOG_DEBUG(USER1, "packet received; core = %u, pkt_len = %u, data_len = %u \n",
-            rte_lcore_id(), buf->pkt_len, buf->data_len);
 
-        kaf_send(buf, 1, 0);
-    }
-    return 0;
-}
+        // dequeue packets from the ring
+        struct rte_mbuf* pkts[MAX_BURST_SIZE];
+        nb_in = rte_ring_dequeue_burst(ring, (void*) pkts, burst_size);
+        
+        if(likely(nb_in > 0)) {
+            params->stats.in += nb_in;
 
-static void print_stats(void)
-{
-    struct rte_eth_stats eth_stats;
-    unsigned i;
-
-    printf("\nThread stats:\n");
-    printf(" - Received:    %" PRIu64 "\n", app_stats.rx.received_pkts);
-    printf(" - Queued:      %" PRIu64 "\n", app_stats.rx.enqueued_pkts);
-    printf(" - Sent:        %" PRIu64 "\n", app_stats.rx.sent_pkts);
+            // prepare the packets to be sent to kafka
+            nb_out = kaf_send(pkts, nb_in, kafka_id);
+            params->stats.out += nb_out;
+        }
 
-    for (i = 0; i < rte_eth_dev_count(); i++) {
-        rte_eth_stats_get(i, &eth_stats);
-        printf("\nPort %u stats:\n", i);
-        printf(" - Pkts in:   %" PRIu64 "\n", eth_stats.ipackets);
-        printf(" - Pkts out:  %" PRIu64 "\n", eth_stats.opackets);
-        printf(" - In Errs:   %" PRIu64 "\n", eth_stats.ierrors);
-        printf(" - Out Errs:  %" PRIu64 "\n", eth_stats.oerrors);
-        printf(" - Mbuf Errs: %" PRIu64 "\n", eth_stats.rx_nombuf);
+        // clean-up the packet buffer    
+        for (i = 0; i < nb_in; i++) {
+            rte_pktmbuf_free(pkts[i]);
+        }
     }
-}
 
-/*
- * Handles interrupt signals.
- */
-static void sig_handler(int sig_num)
-{
-    LOG_INFO(USER1, "Exiting on signal '%d'\n", sig_num);
-
-    // set quit flag for rx thread to exit
-    quit_signal_rx = 1;
+    LOG_INFO(USER1, "Transmit worker finished; core=%u, socket=%u \n", rte_lcore_id(), rte_socket_id());
+    return 0;
 }
 
 /**
@@ -239,14 +307,15 @@ static void sig_handler(int sig_num)
 int main(int argc, char* argv[])
 {
     unsigned lcore_id;
-    unsigned nb_ports;
-    unsigned worker_id = 0;
     unsigned nb_workers;
     uint8_t port_id;
-    uint8_t nb_ports_available;
-
+    unsigned nb_ports;
+    unsigned nb_ports_available;
     struct rte_mempool* mbuf_pool;
-    struct rte_distributor* d;
+    unsigned n, i;
+    unsigned nb_rx_workers, nb_tx_workers;
+    unsigned rx_worker_id = 0, tx_worker_id = 0;
+    char buf[32];
 
     // catch interrupt
     signal(SIGINT, sig_handler);
@@ -254,7 +323,7 @@ int main(int argc, char* argv[])
     // initialize the environment
     int ret = rte_eal_init(argc, argv);
     if (ret < 0) {
-        rte_exit(EXIT_FAILURE, "Error: Problem during initialization: %i\n", ret);
+        rte_exit(EXIT_FAILURE, "Failed to initialize EAL: %i\n", ret);
     }
 
     // advance past the environmental settings
@@ -264,30 +333,29 @@ int main(int argc, char* argv[])
     // parse arguments to the application
     ret = parse_args(argc, argv);
     if (ret < 0) {
-        rte_exit(EXIT_FAILURE, "Error: Invalid parameters\n");
-    }
-
-    // check number of ethernet devices
-    nb_ports = rte_eth_dev_count();
-    if (nb_ports == 0) {
-        rte_exit(EXIT_FAILURE, "Error: No ethernet ports detected\n");
+        rte_exit(EXIT_FAILURE, "Invalid parameters\n");
     }
 
-    // check number of available logical cores for workers
     nb_workers = rte_lcore_count() - 1;
-    if (nb_workers < 1) {
-        rte_exit(EXIT_FAILURE, "Error: Minimum 2 logical cores required. \n");
+    nb_ports_available = nb_ports = rte_eth_dev_count();
+    nb_rx_workers = app.nb_rx_queue;
+    nb_tx_workers = nb_workers - nb_rx_workers;
+    n = NUM_MBUFS * nb_ports;
+
+    // validate the number of workers
+    if(nb_tx_workers < nb_rx_workers) {
+        rte_exit(EXIT_FAILURE, "Additional lcore(s) required; found=%u, required=%u \n", 
+            rte_lcore_count(), (app.nb_rx_queue*2) + 1);
     }
 
     // create memory pool
-    mbuf_pool = rte_pktmbuf_pool_create("mbuf-pool", NUM_MBUFS * nb_ports,
-        MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
+    mbuf_pool = rte_pktmbuf_pool_create("mbuf-pool", n, MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
     if (mbuf_pool == NULL) {
-        rte_exit(EXIT_FAILURE, "Error: Cannot create memory pool for packets\n");
+        rte_exit(EXIT_FAILURE, "Unable to create memory pool; n=%u, cache_size=%u, data_room_size=%u, socket=%u \n", 
+            n, MBUF_CACHE_SIZE, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
     }
 
     // initialize each specified ethernet ports
-    nb_ports_available = nb_ports;
     for (port_id = 0; port_id < nb_ports; port_id++) {
 
         // skip over ports that are not enabled
@@ -297,7 +365,7 @@ int main(int argc, char* argv[])
             continue;
         }
 
-        // initialize the port
+        // initialize the port - creates one receive queue for each worker
         LOG_INFO(USER1, "Initializing port %u\n", (unsigned)port_id);
         if (init_port(port_id, mbuf_pool) != 0) {
             rte_exit(EXIT_FAILURE, "Cannot initialize port %" PRIu8 "\n", port_id);
@@ -305,45 +373,71 @@ int main(int argc, char* argv[])
     }
 
     // ensure that we were able to initialize enough ports
-    if (!nb_ports_available) {
+    if (nb_ports_available < 1) {
         rte_exit(EXIT_FAILURE, "Error: No available enabled ports. Portmask set?\n");
     }
 
-    kaf_init(1);
+    // each transmit worker has their own kafka client connection
+    kaf_init(nb_tx_workers);
+
+    struct rx_worker_params rx_params[nb_rx_workers];
+    struct tx_worker_params tx_params[nb_tx_workers];
 
-    // the distributor will dispatch packets to 1 or more workers
-    d = rte_distributor_create("master", rte_socket_id(), nb_workers);
-    if (d == NULL) {
-        rte_exit(EXIT_FAILURE, "Error: Unable to create distributor\n");
+    // create the transmit rings - 1 for each receive queue
+    struct rte_ring *tx_rings[app.nb_rx_queue]; 
+    for(i = 0; i < app.nb_rx_queue; i++) {
+        sprintf(buf, "tx-ring-%d", i);
+        tx_rings[i] = rte_ring_create(buf, app.tx_ring_size, rte_socket_id(), 0);
+        if(NULL == tx_rings[i]) {
+            rte_exit(EXIT_FAILURE, "Unable to create transmit ring: %s \n", rte_strerror(rte_errno));
+        }
     }
 
-    // launch workers on each logical core
+    // launch the workers
     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
 
-        struct lcore_params* p = rte_malloc(NULL, sizeof(*p), 0);
-        if (!p) {
-            rte_panic("Error: rte_malloc failure\n");
+        if(rx_worker_id < nb_rx_workers) {
+
+            LOG_INFO(USER1, "Launching receive worker; worker=%u, core=%u, queue=%u\n", rx_worker_id, lcore_id, rx_worker_id);
+            rx_params[rx_worker_id] = (struct rx_worker_params) { 
+                .worker_id = rx_worker_id, 
+                .queue_id = rx_worker_id, 
+                .burst_size = app.burst_size, 
+                .output_ring = tx_rings[rx_worker_id], 
+                .stats = {0} 
+            };
+            rte_eal_remote_launch((lcore_function_t*) receive_worker, &rx_params[rx_worker_id], lcore_id);
+            rx_worker_id++;
+
+        } else {
+
+            unsigned ring_id = tx_worker_id % app.nb_rx_queue;
+            LOG_INFO(USER1, "Launching transmit worker; worker=%u, core=%u ring=%u \n", tx_worker_id, lcore_id, ring_id);
+            tx_params[tx_worker_id] = (struct tx_worker_params) { 
+                .worker_id = tx_worker_id, 
+                .burst_size = app.burst_size, 
+                .input_ring = tx_rings[ring_id], 
+                .kafka_id = tx_worker_id,
+                .stats = {0} 
+            };
+            rte_eal_remote_launch((lcore_function_t*) transmit_worker, &tx_params[tx_worker_id], lcore_id);
+            tx_worker_id++;
         }
 
-        // launch the worker process
-        LOG_INFO(USER1, "Launching worker on core %u\n", lcore_id);
-        *p = (struct lcore_params){ worker_id, nb_workers, d, mbuf_pool };
-        rte_eal_remote_launch((lcore_function_t*)send_packets, p, lcore_id);
-
-        worker_id++;
     }
 
-    // start distributing packets on the master
-    struct lcore_params p = { 0, nb_workers, d, mbuf_pool };
-    receive_packets(&p);
+    // allow the master to monitor each of the workers
+    monitor_workers(rx_params, nb_rx_workers, tx_params, nb_tx_workers);
 
     // wait for each worker to complete
     RTE_LCORE_FOREACH_SLAVE(lcore_id) {
         if (rte_eal_wait_lcore(lcore_id) < 0) {
+            LOG_WARN(USER1, "Failed to wait for worker; lcore=%u \n", lcore_id);
             return -1;
         }
     }
 
-    print_stats();
+    kaf_close();
     return 0;
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/81677fd9/metron-sensors/fastcapa/src/main.h
----------------------------------------------------------------------
diff --git a/metron-sensors/fastcapa/src/main.h b/metron-sensors/fastcapa/src/main.h
index abaf82a..0a230a5 100644
--- a/metron-sensors/fastcapa/src/main.h
+++ b/metron-sensors/fastcapa/src/main.h
@@ -61,33 +61,24 @@
 #include <rte_string_fns.h>
 #include <rte_distributor.h>
 #include <rte_malloc.h>
+#include <rte_errno.h>
 
 #include "args.h"
 #include "kafka.h"
+#include "types.h"
 
-#define RX_RING_SIZE 256
-#define TX_RING_SIZE 512
+/*
+ * the number of receive queue descriptors is a multiple of the packet burst size
+ */
+#define RX_QUEUE_MULT 4 
+#define TX_QUEUE_SIZE 32
 #define NUM_MBUFS ((64 * 1024) - 1)
 #define MBUF_CACHE_SIZE 250
-#define BURST_SIZE 32
-#define RTE_RING_SZ 1024
 
 // uncomment below line to enable debug logs
-//#define DEBUG
+//#define DEBUG 
 
 volatile uint8_t quit_signal;
-volatile uint8_t quit_signal_rx;
-
-/**
- * Tracks packet processing stats.
- */
-volatile struct app_stats {
-    struct {
-        uint64_t received_pkts;
-        uint64_t enqueued_pkts;
-        uint64_t sent_pkts;
-    } rx __rte_cache_aligned;
-} app_stats;
 
 /**
  * Default port configuration settings.
@@ -96,6 +87,8 @@ const struct rte_eth_conf port_conf_default = {
     .rxmode = {
         .mq_mode = ETH_MQ_RX_RSS,
         .max_rx_pkt_len = ETHER_MAX_LEN,
+        .enable_scatter = 1,
+        .enable_lro = 1
     },
     .txmode = {
         .mq_mode = ETH_MQ_TX_NONE,
@@ -107,16 +100,7 @@ const struct rte_eth_conf port_conf_default = {
     },
 };
 
-/**
- * Configuration parameters provided to each worker.
- */
-struct lcore_params {
-    unsigned worker_id;
-    unsigned num_workers;
-    struct rte_distributor* d;
-    struct rte_mempool* mem_pool;
-} __rte_cache_aligned;
-
 int main(int argc, char* argv[]);
 
 #endif
+