You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/05/17 14:32:57 UTC
metron git commit: METRON-936: Fixes to pcap for performance and
testing (mmiklavc via cestella) closes apache/incubator-metron#585
Repository: metron
Updated Branches:
refs/heads/master be0307659 -> c0b082523
METRON-936: Fixes to pcap for performance and testing (mmiklavc via cestella) closes apache/incubator-metron#585
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c0b08252
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c0b08252
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c0b08252
Branch: refs/heads/master
Commit: c0b0825235ce5c26649282c43f3c57f092864f94
Parents: be03076
Author: mmiklavc <mi...@gmail.com>
Authored: Wed May 17 10:32:46 2017 -0400
Committer: cstella <ce...@gmail.com>
Committed: Wed May 17 10:32:46 2017 -0400
----------------------------------------------------------------------
metron-platform/metron-pcap-backend/README.md | 198 +++++++++++++++++++
.../src/main/config/pcap.properties | 10 +-
.../src/main/flux/pcap/remote.yaml | 14 +-
.../metron/spout/pcap/HDFSWriterCallback.java | 76 ++++---
.../metron/spout/pcap/HDFSWriterConfig.java | 47 +++++
.../metron/spout/pcap/PartitionHDFSWriter.java | 61 ++++--
.../pcap/deserializer/FromKeyDeserializer.java | 22 +--
.../deserializer/FromPacketDeserializer.java | 16 +-
.../pcap/deserializer/KeyValueDeserializer.java | 15 +-
.../org/apache/metron/utils/PcapInspector.java | 29 +--
.../PcapTopologyIntegrationTest.java | 9 +-
.../deserializer/FromKeyDeserializerTest.java | 39 ++++
.../java/org/apache/metron/pcap/PcapHelper.java | 53 ++++-
.../java/org/apache/metron/pcap/mr/PcapJob.java | 19 +-
14 files changed, 505 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index 5b554b5..da19611 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -4,6 +4,16 @@ The purpose of the Metron PCAP backend is to create a storm topology
capable of ingesting rapidly raw packet capture data directly into HDFS
from Kafka.
+* [Sensors](#the-sensors-feeding-kafka)
+* [PCAP Topology](#the-pcap-topology)
+* [HDFS Files](#the-files-on-hdfs)
+* [Configuration](#configuration)
+* [Starting the Topology](#starting-the-topology)
+* [Utilities](#utilities)
+ * [Inspector Utility](#inspector-utility)
+ * [Query Filter Utility](#query-filter-utility)
+* [Performance Tuning](#performance-tuning)
+
## The Sensors Feeding Kafka
This component must be fed by fast packet capture components upstream
@@ -150,3 +160,191 @@ The packet data will be exposed via the`packet` variable in Stellar.
The format of this regular expression is described [here](https://github.com/nishihatapalmer/byteseek/blob/master/sequencesyntax.md).
+## Performance Tuning
+The PCAP topology is extremely lightweight and functions as a Spout-only topology. In order to tune the topology, users currently must specify a combination of
+properties in pcap.properties as well as configuration in the pcap remote.yaml flux file itself. Tuning the number of partitions in your Kafka topic
+will have a dramatic impact on performance as well. We ran data into Kafka at 1.1 Gbps and our tests resulted in configuring 128 partitions for our kakfa topic
+along with the following settings in pcap.properties and remote.yaml (unrelated properties for performance have been removed):
+
+### pcap.properties file
+```
+spout.kafka.topic.pcap=pcap
+storm.topology.workers=16
+kafka.spout.parallelism=128
+kafka.pcap.numPackets=1000000000
+kafka.pcap.maxTimeMS=0
+hdfs.replication=1
+hdfs.sync.every=10000
+```
+You'll notice that the number of kakfa partitions equals the spout parallelism, and this is no coincidence. The ordering guarantees for a partition in Kafka enforces that you may have no more
+consumers than 1 per topic. Any additional parallelism will leave you with dormant threads consuming resources but performing no additional work. For our cluster with 4 Storm Supervisors, we found 16 workers to
+provide optimal throughput as well. We were largely IO bound rather than CPU bound with the incoming PCAP data.
+
+### remote.yaml
+In the flux file, we introduced the following configuration:
+
+```
+name: "pcap"
+config:
+ topology.workers: ${storm.topology.workers}
+ topology.worker.childopts: ${topology.worker.childopts}
+ topology.auto-credentials: ${storm.auto.credentials}
+ topology.ackers.executors: 0
+components:
+
+ # Any kafka props for the producer go here.
+ - id: "kafkaProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "value.deserializer"
+ - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+ - name: "put"
+ args:
+ - "key.deserializer"
+ - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
+ - name: "put"
+ args:
+ - "group.id"
+ - "pcap"
+ - name: "put"
+ args:
+ - "security.protocol"
+ - "${kafka.security.protocol}"
+ - name: "put"
+ args:
+ - "poll.timeout.ms"
+ - 100
+ - name: "put"
+ args:
+ - "offset.commit.period.ms"
+ - 30000
+ - name: "put"
+ args:
+ - "session.timeout.ms"
+ - 30000
+ - name: "put"
+ args:
+ - "max.uncommitted.offsets"
+ - 200000000
+ - name: "put"
+ args:
+ - "max.poll.interval.ms"
+ - 10
+ - name: "put"
+ args:
+ - "max.poll.records"
+ - 200000
+ - name: "put"
+ args:
+ - "receive.buffer.bytes"
+ - 431072
+ - name: "put"
+ args:
+ - "max.partition.fetch.bytes"
+ - 8097152
+
+ - id: "hdfsProps"
+ className: "java.util.HashMap"
+ configMethods:
+ - name: "put"
+ args:
+ - "io.file.buffer.size"
+ - 1000000
+ - name: "put"
+ args:
+ - "dfs.blocksize"
+ - 1073741824
+
+ - id: "kafkaConfig"
+ className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
+ constructorArgs:
+ - ref: "kafkaProps"
+ # topic name
+ - "${spout.kafka.topic.pcap}"
+ - "${kafka.zk}"
+ configMethods:
+ - name: "setFirstPollOffsetStrategy"
+ args:
+ # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+ - ${kafka.pcap.start}
+
+ - id: "writerConfig"
+ className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
+ configMethods:
+ - name: "withOutputPath"
+ args:
+ - "${kafka.pcap.out}"
+ - name: "withNumPackets"
+ args:
+ - ${kafka.pcap.numPackets}
+ - name: "withMaxTimeMS"
+ args:
+ - ${kafka.pcap.maxTimeMS}
+ - name: "withZookeeperQuorum"
+ args:
+ - "${kafka.zk}"
+ - name: "withSyncEvery"
+ args:
+ - ${hdfs.sync.every}
+ - name: "withReplicationFactor"
+ args:
+ - ${hdfs.replication}
+ - name: "withHDFSConfig"
+ args:
+ - ref: "hdfsProps"
+ - name: "withDeserializer"
+ args:
+ - "${kafka.pcap.ts_scheme}"
+ - "${kafka.pcap.ts_granularity}"
+spouts:
+ - id: "kafkaSpout"
+ className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"
+ parallelism: ${kafka.spout.parallelism}
+ constructorArgs:
+ - ref: "kafkaConfig"
+ - ref: "writerConfig"
+
+```
+
+#### Flux Changes Introduced
+
+##### Topology Configuration
+
+The only change here is `topology.ackers.executors: 0`, which disables Storm tuple acking for maximum throughput.
+
+##### Kafka configuration
+
+```
+poll.timeout.ms
+offset.commit.period.ms
+session.timeout.ms
+max.uncommitted.offsets
+max.poll.interval.ms
+max.poll.records
+receive.buffer.bytes
+max.partition.fetch.bytes
+```
+
+##### Writer Configuration
+
+This is a combination of settings for the HDFSWriter (see pcap.properties values above) as well as HDFS.
+
+__HDFS config__
+
+Component config HashMap with the following properties:
+```
+io.file.buffer.size
+dfs.blocksize
+```
+
+__Writer config__
+
+References the HDFS props component specified above.
+```
+ - name: "withHDFSConfig"
+ args:
+ - ref: "hdfsProps"
+```
+
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
index 6e51dc5..7160178 100644
--- a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
+++ b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
@@ -15,12 +15,18 @@
# limitations under the License.
spout.kafka.topic.pcap=pcap
-storm.auto.credentials=[]
+topology.worker.childopts=
+topology.auto-credentials=[]
+topology.workers=1
kafka.zk=node1:2181
+hdfs.sync.every=1
+hdfs.replication.factor=-1
kafka.security.protocol=PLAINTEXT
-kafka.pcap.start=END
+# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
+kafka.pcap.start=UNCOMMITTED_EARLIEST
kafka.pcap.numPackets=1000
kafka.pcap.maxTimeMS=300000
kafka.pcap.ts_scheme=FROM_KEY
kafka.pcap.out=/apps/metron/pcap
kafka.pcap.ts_granularity=MICROSECONDS
+kafka.spout.parallelism=1
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 2b7e0fd..d7f6f2f 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -16,8 +16,9 @@
name: "pcap"
config:
- topology.workers: 1
- topology.auto-credentials: ${storm.auto.credentials}
+ topology.workers: ${topology.workers}
+ topology.worker.childopts: ${topology.worker.childopts}
+ topology.auto-credentials: ${topology.auto-credentials}
components:
@@ -53,7 +54,7 @@ components:
- name: "setFirstPollOffsetStrategy"
args:
# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
- - "UNCOMMITTED_EARLIEST"
+ - ${kafka.pcap.start}
- id: "writerConfig"
className: "org.apache.metron.spout.pcap.HDFSWriterConfig"
@@ -70,6 +71,12 @@ components:
- name: "withZookeeperQuorum"
args:
- "${kafka.zk}"
+ - name: "withSyncEvery"
+ args:
+ - ${hdfs.sync.every}
+ - name: "withReplicationFactor"
+ args:
+ - ${hdfs.replication.factor}
- name: "withDeserializer"
args:
- "${kafka.pcap.ts_scheme}"
@@ -77,6 +84,7 @@ components:
spouts:
- id: "kafkaSpout"
className: "org.apache.metron.spout.pcap.KafkaToHDFSSpout"
+ parallelism: ${kafka.spout.parallelism}
constructorArgs:
- ref: "kafkaConfig"
- ref: "writerConfig"
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
index 43cd7e0..a6823e6 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java
@@ -19,14 +19,13 @@
package org.apache.metron.spout.pcap;
import com.google.common.base.Joiner;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
+import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
import org.apache.storm.kafka.Callback;
import org.apache.storm.kafka.EmitContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,7 +37,7 @@ import java.util.Map;
*/
public class HDFSWriterCallback implements Callback {
static final long serialVersionUID = 0xDEADBEEFL;
- private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSWriterCallback.class);
/**
* A topic+partition. We split the files up by topic+partition so the writers don't clobber each other
@@ -80,29 +79,12 @@ public class HDFSWriterCallback implements Callback {
}
}
- /**
- * This is a static container of threadlocal LongWritables and BytesWritables. This keeps us from having to create so
- * many objects on the heap. The Deserializers update these for every packet.
- */
- private static class KeyValue {
- static ThreadLocal<LongWritable> key = new ThreadLocal<LongWritable> () {
- @Override
- protected LongWritable initialValue() {
- return new LongWritable();
- }
- };
- static ThreadLocal<BytesWritable> value = new ThreadLocal<BytesWritable> () {
- @Override
- protected BytesWritable initialValue() {
- return new BytesWritable();
- }
- };
- }
private HDFSWriterConfig config;
private EmitContext context;
private Map<Partition, PartitionHDFSWriter> writers = new HashMap<>();
private PartitionHDFSWriter lastWriter = null;
private String topic;
+ private boolean inited = false;
public HDFSWriterCallback() {
}
@@ -116,35 +98,43 @@ public class HDFSWriterCallback implements Callback {
public List<Object> apply(List<Object> tuple, EmitContext context) {
byte[] key = (byte[]) tuple.get(0);
byte[] value = (byte[]) tuple.get(1);
- if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) {
- if(LOG.isDebugEnabled()) {
- List<String> debugStatements = new ArrayList<>();
- if(key != null) {
- debugStatements.add("Key length: " + key.length);
- debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
- }
- else {
- debugStatements.add("Key is null!");
- }
-
- if(value != null) {
- debugStatements.add("Value length: " + value.length);
- debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
- }
- else {
- debugStatements.add("Value is null!");
- }
- LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
+ long tsDeserializeStart = System.nanoTime();
+ KeyValueDeserializer.Result result = config.getDeserializer().deserializeKeyValue(key, value);
+ long tsDeserializeEnd = System.nanoTime();
+
+ if (LOG.isDebugEnabled() && !result.foundTimestamp) {
+ List<String> debugStatements = new ArrayList<>();
+ if (key != null) {
+ debugStatements.add("Key length: " + key.length);
+ debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key));
+ } else {
+ debugStatements.add("Key is null!");
}
+
+ if (value != null) {
+ debugStatements.add("Value length: " + value.length);
+ debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value));
+ } else {
+ debugStatements.add("Value is null!");
+ }
+ LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements));
}
+
+ long tsWriteStart = System.nanoTime();
try {
getWriter(new Partition( topic
, context.get(EmitContext.Type.PARTITION))
- ).handle(KeyValue.key.get(), KeyValue.value.get());
+ ).handle(result.key, result.value);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
//drop? not sure..
}
+ long tsWriteEnd = System.nanoTime();
+ if(LOG.isDebugEnabled() && (Math.random() < 0.001 || !inited)) {
+ LOG.debug("Deserialize time (ns): " + (tsDeserializeEnd - tsDeserializeStart));
+ LOG.debug("Write time (ns): " + (tsWriteEnd - tsWriteStart));
+ }
+ inited = true;
return tuple;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
index 66bb359..b6a2809 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java
@@ -25,7 +25,9 @@ import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Configure the HDFS Writer for PCap
@@ -34,9 +36,12 @@ public class HDFSWriterConfig implements Serializable {
static final long serialVersionUID = 0xDEADBEEFL;
private long numPackets;
private long maxTimeNS;
+ private int syncEvery = 1;
+ private int replicationFactor = -1;
private String outputPath;
private String zookeeperQuorum;
private KeyValueDeserializer deserializer;
+ private Map<String, Object> hdfsConfig = new HashMap<>();
/**
* Set the deserializer, the bit of logic that defines how the timestamp and packet are read.
@@ -70,6 +75,36 @@ public class HDFSWriterConfig implements Serializable {
}
/**
+ * The number of packets to write before a file is rolled.
+ * @param n
+ * @return
+ */
+ public HDFSWriterConfig withSyncEvery(int n) {
+ syncEvery = n;
+ return this;
+ }
+
+ /**
+ * The map config for HDFS
+ * @param config
+ * @return
+ */
+ public HDFSWriterConfig withHDFSConfig(Map<String, Object> config) {
+ hdfsConfig = config;
+ return this;
+ }
+
+ /**
+ * The HDFS replication factor to use. A value of -1 will not set replication factor.
+ * @param n
+ * @return
+ */
+ public HDFSWriterConfig withReplicationFactor(int n) {
+ replicationFactor = n;
+ return this;
+ }
+
+ /**
* The total amount of time (in ms) to write before a file is rolled.
* @param t
* @return
@@ -103,6 +138,10 @@ public class HDFSWriterConfig implements Serializable {
return out;
}
+ public Map<String, Object> getHDFSConfig() {
+ return hdfsConfig;
+ }
+
public Integer getZookeeperPort() {
if(zookeeperQuorum != null) {
String hostPort = Iterables.getFirst(Splitter.on(',').split(zookeeperQuorum), null);
@@ -112,6 +151,14 @@ public class HDFSWriterConfig implements Serializable {
return null;
}
+ public int getSyncEvery() {
+ return syncEvery;
+ }
+
+ public int getReplicationFactor() {
+ return replicationFactor;
+ }
+
public KeyValueDeserializer getDeserializer() {
return deserializer;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
index f0ea1eb..86697db 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -27,18 +27,23 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.log4j.Logger;
import org.apache.metron.pcap.PcapHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
import java.util.EnumSet;
+import java.util.Map;
/**
* This class is intended to handle the writing of an individual file.
*/
public class PartitionHDFSWriter implements AutoCloseable, Serializable {
static final long serialVersionUID = 0xDEADBEEFL;
- private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionHDFSWriter.class);
public static interface SyncHandler {
@@ -102,14 +107,43 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
private SyncHandler syncHandler;
private long batchStartTime;
private long numWritten;
+ private Configuration fsConfig = new Configuration();
public PartitionHDFSWriter(String topic, int partition, String uuid, HDFSWriterConfig config) {
this.topic = topic;
this.partition = partition;
this.uuid = uuid;
this.config = config;
+
try {
- this.fs = FileSystem.get(new Configuration());
+ int replicationFactor = config.getReplicationFactor();
+ if (replicationFactor > 0) {
+ fsConfig.set("dfs.replication", String.valueOf(replicationFactor));
+ }
+ if(config.getHDFSConfig() != null && !config.getHDFSConfig().isEmpty()) {
+ for(Map.Entry<String, Object> entry : config.getHDFSConfig().entrySet()) {
+ if(entry.getValue() instanceof Integer) {
+ fsConfig.setInt(entry.getKey(), (int)entry.getValue());
+ }
+ else if(entry.getValue() instanceof Boolean)
+ {
+ fsConfig.setBoolean(entry.getKey(), (Boolean) entry.getValue());
+ }
+ else if(entry.getValue() instanceof Long)
+ {
+ fsConfig.setLong(entry.getKey(), (Long) entry.getValue());
+ }
+ else if(entry.getValue() instanceof Float)
+ {
+ fsConfig.setFloat(entry.getKey(), (Float) entry.getValue());
+ }
+ else
+ {
+ fsConfig.set(entry.getKey(), String.valueOf(entry.getValue()));
+ }
+ }
+ }
+ this.fs = FileSystem.get(fsConfig);
} catch (IOException e) {
throw new RuntimeException("Unable to get FileSystem", e);
}
@@ -119,11 +153,14 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
return Long.toUnsignedString(ts);
}
- public void handle(LongWritable ts, BytesWritable value) throws IOException {
- turnoverIfNecessary(ts.get());
- writer.append(ts, value);
- syncHandler.sync(outputStream);
+ public void handle(long ts, byte[] value) throws IOException {
+ turnoverIfNecessary(ts);
+ BytesWritable bw = new BytesWritable(value);
+ writer.append(new LongWritable(ts), bw);
numWritten++;
+ if(numWritten % config.getSyncEvery() == 0) {
+ syncHandler.sync(outputStream);
+ }
}
public String getTopic() {
@@ -144,8 +181,8 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
outputStream.close();
}
}
- private Path getPath(long ts) {
+ private Path getPath(long ts) {
String fileName = PcapHelper.toFilename(topic, ts, partition + "", uuid);
return new Path(config.getOutputPath(), fileName);
}
@@ -157,7 +194,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
private void turnoverIfNecessary(long ts, boolean force) throws IOException {
long duration = ts - batchStartTime;
boolean initial = outputStream == null;
- boolean overDuration = duration >= config.getMaxTimeNS();
+ boolean overDuration = config.getMaxTimeNS() <= 0 ? false : Long.compareUnsigned(duration, config.getMaxTimeNS()) >= 0;
boolean tooManyPackets = numWritten >= config.getNumPackets();
if(force || initial || overDuration || tooManyPackets ) {
//turnover
@@ -183,14 +220,14 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
}
}
- writer = SequenceFile.createWriter(new Configuration()
+ writer = SequenceFile.createWriter(this.fsConfig
, SequenceFile.Writer.keyClass(LongWritable.class)
, SequenceFile.Writer.valueClass(BytesWritable.class)
, SequenceFile.Writer.stream(outputStream)
, SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)
);
//reset state
- LOG.info("Turning over and writing to " + path);
+ LOG.info("Turning over and writing to {}: [duration={} NS, force={}, initial={}, overDuration={}, tooManyPackets={}]", path, duration, force, initial, overDuration, tooManyPackets);
batchStartTime = ts;
numWritten = 0;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
index de1e24b..749d74c 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java
@@ -18,19 +18,18 @@
package org.apache.metron.spout.pcap.deserializer;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
import org.apache.metron.common.utils.timestamp.TimestampConverter;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.spout.pcap.Endianness;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Extract the timestamp from the key and raw data from the packet.
*/
public class FromKeyDeserializer extends KeyValueDeserializer {
- private static final Logger LOG = Logger.getLogger(FromKeyDeserializer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FromKeyDeserializer.class);
private static Endianness endianness = Endianness.getNativeEndianness();
@@ -39,13 +38,12 @@ public class FromKeyDeserializer extends KeyValueDeserializer {
}
@Override
- public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) {
- Long ts = converter.toNanoseconds(fromBytes(key));
- outKey.set(ts);
- byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
- byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
- outValue.set(globalHeaderized, 0, globalHeaderized.length);
- return true;
+ public Result deserializeKeyValue(byte[] key, byte[] value) {
+ if (key == null) {
+ throw new IllegalArgumentException("Expected a key but none provided");
+ }
+ long ts = converter.toNanoseconds(fromBytes(key));
+ return new Result(ts, PcapHelper.addHeaders(ts, value, endianness), true);
}
/**
@@ -65,6 +63,6 @@ public class FromKeyDeserializer extends KeyValueDeserializer {
value |= (long)(b & 255);
}
- return Long.valueOf(value);
+ return value;
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
index 6098904..0ba92f8 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java
@@ -18,26 +18,24 @@
package org.apache.metron.spout.pcap.deserializer;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
import org.apache.metron.pcap.PcapHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Extract the timestamp and raw data from the packet.
*/
public class FromPacketDeserializer extends KeyValueDeserializer {
- private static final Logger LOG = Logger.getLogger(FromPacketDeserializer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FromPacketDeserializer.class);
@Override
- public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) {
+ public Result deserializeKeyValue(byte[] key, byte[] value) {
Long ts = PcapHelper.getTimestamp(value);
if(ts != null) {
- outKey.set(ts);
- outValue.set(value, 0, value.length);
- return true;
+ return new Result(ts, value, true);
}
else {
- return false;
+ return new Result(ts, value, false);
}
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
index 48bea87..311be04 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java
@@ -18,8 +18,6 @@
package org.apache.metron.spout.pcap.deserializer;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.metron.common.utils.timestamp.TimestampConverter;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
@@ -28,6 +26,17 @@ import java.io.Serializable;
public abstract class KeyValueDeserializer implements Serializable {
protected TimestampConverter converter;
+ public static class Result {
+ public byte[] value;
+ public Long key;
+ public boolean foundTimestamp;
+ public Result(Long key, byte[] value, boolean foundTimestamp) {
+ this.key = key;
+ this.value = value;
+ this.foundTimestamp = foundTimestamp;
+ }
+ }
+
public KeyValueDeserializer() {
this(TimestampConverters.MICROSECONDS);
}
@@ -36,6 +45,6 @@ public abstract class KeyValueDeserializer implements Serializable {
this.converter = converter;
}
- public abstract boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue);
+ public abstract Result deserializeKeyValue(byte[] key, byte[] value);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
index f460db3..c887606 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
@@ -35,7 +35,10 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
public class PcapInspector {
private static abstract class OptionHandler implements Function<String, Option> {}
@@ -136,20 +139,24 @@ public class PcapInspector {
LongWritable key = new LongWritable();
BytesWritable value = new BytesWritable();
- for(int i = 0;(n < 0 || i < n) && reader.next(key, value);++i) {
+ for (int i = 0; (n < 0 || i < n) && reader.next(key, value); ++i) {
long millis = Long.divideUnsigned(key.get(), 1000000);
String ts = DATE_FORMAT.format(new Date(millis));
- for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
- Map<String, Object> result = PcapHelper.packetToFields(pi);
- List<String> fieldResults = new ArrayList<String>() {{
- add("TS: " + ts);
- }};
- for(Constants.Fields field : Constants.Fields.values()) {
- if(result.containsKey(field.getName())) {
- fieldResults.add(field.getName() + ": " + result.get(field.getName()));
+ try {
+ for (PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
+ Map<String, Object> result = PcapHelper.packetToFields(pi);
+ List<String> fieldResults = new ArrayList<String>() {{
+ add("TS: " + ts);
+ }};
+ for (Constants.Fields field : Constants.Fields.values()) {
+ if (result.containsKey(field.getName())) {
+ fieldResults.add(field.getName() + ": " + result.get(field.getName()));
+ }
}
+ System.out.println(Joiner.on(",").join(fieldResults));
}
- System.out.println(Joiner.on(",").join(fieldResults));
+ } catch (Exception e) {
+ System.out.println(String.format("Error: malformed packet #=%s, ts=%s, error msg=%s", i + 1, ts, e.getMessage()));
}
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index a869723..d6d54dc 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -213,14 +213,19 @@ public class PcapTopologyIntegrationTest {
final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
Assert.assertTrue(Iterables.size(pcapEntries) > 0);
final Properties topologyProperties = new Properties() {{
+ setProperty("topology.workers", "1");
+ setProperty("topology.worker.childopts", "");
setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC);
- setProperty("kafka.pcap.start", "BEGINNING");
+ setProperty("kafka.pcap.start", "EARLIEST");
setProperty("kafka.pcap.out", outDir.getAbsolutePath());
setProperty("kafka.pcap.numPackets", "2");
setProperty("kafka.pcap.maxTimeMS", "200000000");
setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
- setProperty("storm.auto.credentials", "[]");
+ setProperty("kafka.spout.parallelism", "1");
+ setProperty("topology.auto-credentials", "[]");
setProperty("kafka.security.protocol", "PLAINTEXT");
+ setProperty("hdfs.sync.every", "1");
+ setProperty("hdfs.replication.factor", "-1");
}};
updatePropertiesCallback.apply(topologyProperties);
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java
new file mode 100644
index 0000000..1d49103
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializerTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.deserializer;
+
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class FromKeyDeserializerTest {
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void empty_or_null_key_throws_illegal_argument_exception() {
+ exception.expect(IllegalArgumentException.class);
+ exception.expectMessage("Expected a key but none provided");
+
+ FromKeyDeserializer deserializer = new FromKeyDeserializer(TimestampConverters.NANOSECONDS);
+ deserializer.deserializeKeyValue(null, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
index ebd7ac7..e1ad3ca 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -39,9 +39,10 @@ import org.krakenapps.pcap.util.ByteOrderConverter;
import java.io.EOFException;
import java.io.IOException;
-import java.util.*;
-
-import static org.apache.metron.pcap.Constants.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class PcapHelper {
@@ -81,6 +82,14 @@ public class PcapHelper {
}
}
+ /**
+ *
+ * @param topic
+ * @param timestamp
+ * @param partition kafka partition
+ * @param uuid
+ * @return filename in this format: pcap_topic_timestamp_partition_uuid, e.g. pcap_pcap_1494886105667571000_0_pcap-8-1494965816
+ */
public static String toFilename(String topic, long timestamp, String partition, String uuid)
{
return Joiner.on("_").join("pcap"
@@ -163,6 +172,44 @@ public class PcapHelper {
}
return null;
}
+
+ public static byte[] addHeaders(long tsNano, byte[] packet, Endianness endianness) {
+ byte[] ret = new byte[GLOBAL_HEADER_SIZE + PACKET_HEADER_SIZE + packet.length];
+ byte[] globalHeader = getPcapGlobalHeader(endianness);
+ int offset = 0;
+ System.arraycopy(globalHeader, 0, ret, offset, GLOBAL_HEADER_SIZE);
+ offset += globalHeader.length;
+ {
+ boolean swapBytes = swapBytes(endianness);
+ long micros = Long.divideUnsigned(tsNano, 1000);
+ int secs = (int)(micros / 1000000);
+ int usec = (int)(micros % 1000000);
+ int capLen = packet.length;
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(secs):secs);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(usec):usec);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ }
+ System.arraycopy(packet, 0, ret, offset, packet.length);
+ return ret;
+ }
+
public static byte[] addGlobalHeader(byte[] packet, Endianness endianness) {
byte[] globalHeader = getPcapGlobalHeader(endianness);
byte[] ret = new byte[packet.length + GLOBAL_HEADER_SIZE];
http://git-wip-us.apache.org/repos/asf/metron/blob/c0b08252/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 3d4a4b3..8d40e5f 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -52,6 +52,11 @@ public class PcapJob {
public static final String START_TS_CONF = "start_ts";
public static final String END_TS_CONF = "end_ts";
public static final String WIDTH_CONF = "width";
+
+ public static enum PCAP_COUNTER {
+ MALFORMED_PACKET_COUNT
+ }
+
public static class PcapPartitioner extends Partitioner<LongWritable, BytesWritable> implements Configurable {
private Configuration configuration;
Long start = null;
@@ -110,15 +115,23 @@ public class PcapJob {
// object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
// objects back to byte arrays, otherwise we could support more than one packet.
// Note: short-circuit findAny() func on stream
- boolean send = filteredPacketInfo(value).findAny().isPresent();
+ List<PacketInfo> packetInfos;
+ try {
+ packetInfos = PcapHelper.toPacketInfo(value.copyBytes());
+ } catch(Exception e) {
+ // toPacketInfo is throwing RuntimeExceptions. Attempt to catch and count errors with malformed packets
+ context.getCounter(PCAP_COUNTER.MALFORMED_PACKET_COUNT).increment(1);
+ return;
+ }
+ boolean send = filteredPacketInfo(packetInfos).findAny().isPresent();
if (send) {
context.write(key, value);
}
}
}
- private Stream<PacketInfo> filteredPacketInfo(BytesWritable value) throws IOException {
- return PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter);
+ private Stream<PacketInfo> filteredPacketInfo(List<PacketInfo> packetInfos) throws IOException {
+ return packetInfos.stream().filter(filter);
}
}