You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/04/29 18:28:32 UTC
[2/6] incubator-metron git commit: METRON-119 - Move PCAP
infrastructure from HBase closes apache/incubator-metron#93
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
new file mode 100644
index 0000000..49fd476
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.log4j.Logger;
+import org.apache.metron.pcap.PcapHelper;
+
+import java.io.*;
+import java.util.EnumSet;
+
+public class PartitionHDFSWriter implements AutoCloseable, Serializable {
+ static final long serialVersionUID = 0xDEADBEEFL;
+ private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class);
+
+
+ public static interface SyncHandler {
+ void sync(FSDataOutputStream outputStream) throws IOException;
+ }
+
+ public static enum SyncHandlers implements SyncHandler{
+ DEFAULT(new SyncHandler() {
+
+ public void sync(FSDataOutputStream outputStream) throws IOException {
+ outputStream.hflush();
+ outputStream.hsync();
+ }
+ })
+ ,HDFS(new SyncHandler() {
+ public void sync(FSDataOutputStream outputStream) throws IOException{
+
+ outputStream.hflush();
+ outputStream.hsync();
+ ((HdfsDataOutputStream)outputStream).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ }
+ })
+ ,LOCAL(new SyncHandler() {
+
+ @Override
+ public void sync(FSDataOutputStream outputStream) throws IOException {
+ outputStream.getWrappedStream().flush();
+ outputStream.getWrappedStream();
+ }
+ })
+ ;
+ private SyncHandler func;
+ SyncHandlers(SyncHandler func) {
+ this.func = func;
+ }
+
+ private SyncHandler getHandler() {
+ return func;
+ }
+
+ @Override
+ public void sync(FSDataOutputStream input) throws IOException {
+ func.sync(input);
+ }
+ }
+
+
+ private String topic;
+ private int partition;
+ private String uuid;
+ private FileSystem fs;
+ private FSDataOutputStream outputStream;
+ private SequenceFile.Writer writer;
+ private HDFSWriterConfig config;
+ private SyncHandler syncHandler;
+ private long batchStartTime;
+ private long numWritten;
+
+ public PartitionHDFSWriter(String topic, int partition, String uuid, HDFSWriterConfig config) {
+ this.topic = topic;
+ this.partition = partition;
+ this.uuid = uuid;
+ this.config = config;
+ try {
+ this.fs = FileSystem.get(new Configuration());
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to get FileSystem", e);
+ }
+ }
+
+ public String timestampToString(long ts) {
+ return Long.toUnsignedString(ts);
+ }
+
+ public void handle(LongWritable ts, BytesWritable value) throws IOException {
+ turnoverIfNecessary(ts.get());
+ writer.append(ts, new BytesWritable(value.getBytes()));
+ syncHandler.sync(outputStream);
+ numWritten++;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if(writer != null) {
+ writer.close();
+ }
+ if(outputStream != null) {
+ outputStream.close();
+ }
+ }
+ private Path getPath(long ts) {
+
+ String fileName = PcapHelper.toFilename(topic, ts, partition + "", uuid);
+ return new Path(config.getOutputPath(), fileName);
+ }
+
+ private void turnoverIfNecessary(long ts) throws IOException {
+ turnoverIfNecessary(ts, false);
+ }
+
+ private void turnoverIfNecessary(long ts, boolean force) throws IOException {
+ long duration = ts - batchStartTime;
+ boolean initial = outputStream == null;
+ boolean overDuration = duration >= config.getMaxTimeNS();
+ boolean tooManyPackets = numWritten >= config.getNumPackets();
+ if(force || initial || overDuration || tooManyPackets ) {
+ //turnover
+ Path path = getPath(ts);
+ close();
+
+ if(fs instanceof LocalFileSystem) {
+ outputStream = new FSDataOutputStream(new FileOutputStream(new File(path.toString())));
+ syncHandler = SyncHandlers.LOCAL.getHandler();
+ }
+ else {
+ outputStream = fs.create(path, true);
+ if (outputStream instanceof HdfsDataOutputStream) {
+ if (initial) {
+ LOG.info("Using the HDFS sync handler.");
+ }
+ syncHandler = SyncHandlers.HDFS.getHandler();
+ } else {
+ if (initial) {
+ LOG.info("Using the default sync handler, which cannot guarantee atomic appends at the record level, be forewarned!");
+ }
+ syncHandler = SyncHandlers.DEFAULT.getHandler();
+ }
+
+ }
+ writer = SequenceFile.createWriter(new Configuration()
+ , SequenceFile.Writer.keyClass(LongWritable.class)
+ , SequenceFile.Writer.valueClass(BytesWritable.class)
+ , SequenceFile.Writer.stream(outputStream)
+ , SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)
+ );
+ //reset state
+ LOG.info("Turning over and writing to " + path);
+ batchStartTime = ts;
+ numWritten = 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
new file mode 100644
index 0000000..99f9229
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap;
+
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.spout.pcap.scheme.TimestampScheme;
+import storm.kafka.BrokerHosts;
+
+public class SpoutConfig extends org.apache.metron.common.spout.kafka.SpoutConfig{
+
+ public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
+ super(hosts, topic, zkRoot, id);
+ }
+
+ public SpoutConfig withTimestampScheme(String scheme, String granularity) {
+ super.scheme = TimestampScheme.getScheme(scheme, TimestampConverters.getConverter(granularity));
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
new file mode 100644
index 0000000..28aae7a
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.spout.pcap.Endianness;
+import storm.kafka.KeyValueScheme;
+
+import java.util.List;
+
+public class FromKeyScheme implements KeyValueScheme, KeyConvertible {
+ private static final Logger LOG = Logger.getLogger(FromKeyScheme.class);
+
+ private TimestampConverter converter = TimestampConverters.MICROSECONDS;
+ private static Endianness endianness = Endianness.getNativeEndianness();
+ @Override
+ public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
+ Long ts = converter.toNanoseconds(Bytes.toLong(key));
+ byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness);
+ byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness);
+ return new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(globalHeaderized)));
+ }
+
+ @Override
+ public List<Object> deserialize(byte[] ser) {
+ throw new UnsupportedOperationException("Really only interested in deserializing a key and a value");
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(TimestampScheme.KV_FIELD);
+ }
+
+ @Override
+ public FromKeyScheme withTimestampConverter(TimestampConverter converter) {
+ try {
+ this.converter = converter;
+ }
+ catch(IllegalArgumentException iae) {
+ LOG.error(iae.getMessage(), iae);
+ }
+ return this;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
new file mode 100644
index 0000000..9b94e2b
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.spout.MultiScheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import org.apache.metron.pcap.PcapHelper;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FromPacketScheme implements MultiScheme,KeyConvertible {
+ private static final Logger LOG = Logger.getLogger(FromPacketScheme.class);
+ @Override
+ public Iterable<List<Object>> deserialize(byte[] rawValue) {
+ byte[] value = rawValue;
+ Long ts = PcapHelper.getTimestamp(value);
+ if(ts != null) {
+ return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(rawValue))));
+ }
+ else {
+ return ImmutableList.of(new Values(Collections.EMPTY_LIST));
+ }
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return new Fields(TimestampScheme.KV_FIELD);
+ }
+
+
+ @Override
+ public FromPacketScheme withTimestampConverter(TimestampConverter converter) {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
new file mode 100644
index 0000000..54e52e8
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.spout.pcap.scheme;
+
+
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+
+public interface KeyConvertible {
+ KeyConvertible withTimestampConverter(TimestampConverter converter);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
new file mode 100644
index 0000000..3f98c44
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.spout.MultiScheme;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+import storm.kafka.KeyValueSchemeAsMultiScheme;
+
+public enum TimestampScheme {
+ FROM_KEY( converter -> new KeyValueSchemeAsMultiScheme(new FromKeyScheme().withTimestampConverter(converter)))
+ ,FROM_PACKET(converter -> new FromPacketScheme().withTimestampConverter(converter));
+ ;
+ public static final String KV_FIELD = "kv";
+ TimestampSchemeCreator creator;
+ TimestampScheme(TimestampSchemeCreator creator)
+ {
+ this.creator = creator;
+ }
+
+ public static MultiScheme getScheme(String scheme, TimestampConverter converter) {
+ try {
+ TimestampScheme ts = TimestampScheme.valueOf(scheme.toUpperCase());
+ return ts.creator.create(converter);
+ }
+ catch(IllegalArgumentException iae) {
+ return TimestampScheme.FROM_KEY.creator.create(converter);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
new file mode 100644
index 0000000..e88926b
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap.scheme;
+
+import backtype.storm.spout.MultiScheme;
+import org.apache.metron.common.utils.timestamp.TimestampConverter;
+
+public interface TimestampSchemeCreator {
+ MultiScheme create(TimestampConverter converter);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
new file mode 100644
index 0000000..661ab2f
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/utils/PcapInspector.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.utils;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import org.apache.commons.cli.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+public class PcapInspector {
+ private static abstract class OptionHandler implements Function<String, Option> {}
+ private enum InspectorOptions {
+ HELP("h", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ return new Option(s, "help", false, "Generate Help screen");
+ }
+ })
+ ,INPUT("i", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "input", true, "Input sequence file on HDFS");
+ o.setArgName("SEQ_FILE");
+ o.setRequired(true);
+ return o;
+ }
+ })
+ ,NUM("n", new OptionHandler() {
+
+ @Nullable
+ @Override
+ public Option apply(@Nullable String s) {
+ Option o = new Option(s, "num_packets", true, "Number of packets to dump");
+ o.setArgName("N");
+ o.setRequired(false);
+ return o;
+ }
+ })
+ ;
+ Option option;
+ String shortCode;
+
+ InspectorOptions(String shortCode, OptionHandler optionHandler) {
+ this.shortCode = shortCode;
+ this.option = optionHandler.apply(shortCode);
+ }
+
+ public boolean has(CommandLine cli) {
+ return cli.hasOption(shortCode);
+ }
+
+ public String get(CommandLine cli) {
+ return cli.getOptionValue(shortCode);
+ }
+
+ public static CommandLine parse(CommandLineParser parser, String[] args) {
+ try {
+ CommandLine cli = parser.parse(getOptions(), args);
+ if (InspectorOptions.HELP.has(cli)) {
+ printHelp();
+ System.exit(0);
+ }
+ return cli;
+ } catch (ParseException e) {
+ System.err.println("Unable to parse args: " + Joiner.on(' ').join(args));
+ e.printStackTrace(System.err);
+ printHelp();
+ System.exit(-1);
+ return null;
+ }
+ }
+
+ public static void printHelp() {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("PcapInspector", getOptions());
+ }
+
+ public static Options getOptions() {
+ Options ret = new Options();
+ for (InspectorOptions o : InspectorOptions.values()) {
+ ret.addOption(o.option);
+ }
+ return ret;
+ }
+ }
+
+ public static final DateFormat DATE_FORMAT = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG
+ , SimpleDateFormat.LONG
+ );
+ public static void main(String... argv) throws IOException {
+ Configuration conf = new Configuration();
+ String[] otherArgs = new GenericOptionsParser(conf, argv).getRemainingArgs();
+ CommandLine cli = InspectorOptions.parse(new PosixParser(), otherArgs);
+ Path inputPath = new Path(InspectorOptions.INPUT.get(cli));
+ int n = -1;
+ if(InspectorOptions.NUM.has(cli)) {
+ n = Integer.parseInt(InspectorOptions.NUM.get(cli));
+ }
+ SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
+ SequenceFile.Reader.file(inputPath)
+ );
+ LongWritable key = new LongWritable();
+ BytesWritable value = new BytesWritable();
+
+ for(int i = 0;(n < 0 || i < n) && reader.next(key, value);++i) {
+ long millis = Long.divideUnsigned(key.get(), 1000000);
+ String ts = DATE_FORMAT.format(new Date(millis));
+ for(PacketInfo pi : PcapHelper.toPacketInfo(value.copyBytes())) {
+ EnumMap<Constants.Fields, Object> result = PcapHelper.packetToFields(pi);
+ List<String> fieldResults = new ArrayList<String>() {{
+ add("TS: " + ts);
+ }};
+ for(Constants.Fields field : Constants.Fields.values()) {
+ if(result.containsKey(field)) {
+ fieldResults.add(field + ": " + result.get(field));
+ }
+ }
+ System.out.println(Joiner.on(",").join(fieldResults));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh
new file mode 100755
index 0000000..a5a24ae
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/scripts/pcap_inspector.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=${project.artifactId}-$METRON_VERSION.jar
+
+yarn jar $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.PcapInspector "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh b/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh
new file mode 100644
index 0000000..a38f905
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/main/scripts/start_pcap_topology.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+export METRON_VERSION=${project.version}
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=${project.artifactId}-$METRON_VERSION.jar
+
+storm jar $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/pcap/remote.yaml --filter $METRON_HOME/config/pcap.properties
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
new file mode 100644
index 0000000..70c973a
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.*;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.mr.PcapFilter;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+public class PcapFilterTest {
+ @Test
+ public void testTrivialEquality() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ PcapJob.addToConfig(fields, config);
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ }
+
+ @Test
+ public void testReverseTraffic() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+ }};
+ PcapJob.addToConfig(fields, config);
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "dst_ip");
+ put(Constants.Fields.SRC_PORT, 1);
+ put(Constants.Fields.DST_ADDR, "src_ip");
+ put(Constants.Fields.DST_PORT, 0);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "dst_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "src_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertFalse(filter.apply(null));
+ }
+ }
+@Test
+public void testMissingDstAddr() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ PcapJob.addToConfig(fields, config);
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip1");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertFalse(filter.apply(null));
+ }
+}
+ @Test
+ public void testMissingDstPort() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ PcapJob.addToConfig(fields, config);
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 100);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 100);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 100);
+ }};
+ }
+ };
+ Assert.assertFalse(filter.apply(null));
+ }
+ }
+ @Test
+ public void testMissingSrcAddr() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ PcapJob.addToConfig(fields, config);
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ }
+ @Test
+ public void testMissingSrcPort() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ PcapJob.addToConfig(fields, config);
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ {
+ PcapFilter filter = new PcapFilter(config) {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 100);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ Assert.assertTrue(filter.apply(null));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
new file mode 100644
index 0000000..17c9325
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PcapJobTest {
+
+ @Test
+ public void test_getPaths_NoFiles() throws Exception {
+ PcapJob job;
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ }};
+ job = new PcapJob() {
+ @Override
+ protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+ return inputFiles;
+ }
+ };
+ Iterable<String> paths = job.getPaths(null, null, 0, 1000);
+ Assert.assertTrue(Iterables.isEmpty(paths));
+ }
+ }
+ @Test
+ public void test_getPaths_leftEdge() throws Exception {
+ PcapJob job;
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap_pcap_1561589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ job = new PcapJob() {
+ @Override
+ protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+ return inputFiles;
+ }
+ };
+ Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
+ Assert.assertEquals(1,Iterables.size(paths));
+ }
+ }
+ @Test
+ public void test_getPaths_rightEdge() throws Exception {
+ PcapJob job;
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ job = new PcapJob() {
+ @Override
+ protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+ return inputFiles;
+ }
+ };
+ Iterable<String> paths = job.getPaths(null, null, 1461589333993573000L-1L, 1461589333993573000L + 1L);
+ Assert.assertEquals(2,Iterables.size(paths));
+ }
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap0_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ job = new PcapJob() {
+ @Override
+ protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+ return inputFiles;
+ }
+ };
+ Iterable<String> paths = job.getPaths(null, null, 1461589334993573000L-1L, 1461589334993573000L + 1L);
+ Assert.assertEquals(2,Iterables.size(paths));
+ }
+ }
+ @Test
+ public void test_getPaths_bothEdges() throws Exception {
+ PcapJob job;
+ {
+ final List<Path> inputFiles = new ArrayList<Path>() {{
+ add(new Path("/apps/metron/pcap/pcap_pcap_1461589332993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap_pcap_1461589333993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ add(new Path("/apps/metron/pcap/pcap1_pcap_1461589334993573000_0_73686171-64a1-46e5-9e67-66cf603fb094"));
+ }};
+ job = new PcapJob() {
+ @Override
+ protected Iterable<Path> listFiles(FileSystem fs, Path basePath) throws IOException {
+ return inputFiles;
+ }
+ };
+ Iterable<String> paths = job.getPaths(null, null, 0, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
+ Assert.assertEquals(3,Iterables.size(paths));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
new file mode 100644
index 0000000..587ec7c
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -0,0 +1,398 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.integration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import kafka.consumer.ConsumerIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.metron.common.Constants;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.components.FluxTopologyComponent;
+import org.apache.metron.integration.components.KafkaWithZKComponent;
+import org.apache.metron.integration.components.MRComponent;
+import org.apache.metron.integration.utils.KafkaUtil;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.PcapMerger;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.spout.pcap.Endianness;
+import org.apache.metron.spout.pcap.scheme.TimestampScheme;
+import org.apache.metron.test.utils.UnitTestHelper;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.util.*;
+
+public class PcapTopologyIntegrationTest {
+ final static String KAFKA_TOPIC = "pcap";
+ private static String BASE_DIR = "pcap";
+ private static String DATA_DIR = BASE_DIR + "/data_dir";
+ private static String QUERY_DIR = BASE_DIR + "/query";
+ private String topologiesDir = "src/main/flux";
+ private String targetDir = "target";
+ private File getOutDir(String targetDir) {
+ File outDir = new File(new File(targetDir), DATA_DIR);
+ if (!outDir.exists()) {
+ outDir.mkdirs();
+ }
+
+ return outDir;
+ }
+
+ private File getQueryDir(String targetDir) {
+ File outDir = new File(new File(targetDir), QUERY_DIR);
+ if (!outDir.exists()) {
+ outDir.mkdirs();
+ }
+ return outDir;
+ }
+ private static void clearOutDir(File outDir) {
+ for(File f : outDir.listFiles()) {
+ f.delete();
+ }
+ }
+ private static int numFiles(File outDir, Configuration config) {
+
+ return outDir.list(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return !name.endsWith(".crc");
+ }
+ }).length;
+ }
+
+ private static Iterable<Map.Entry<byte[], byte[]>> readPcaps(Path pcapFile, boolean withHeaders) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),
+ SequenceFile.Reader.file(pcapFile)
+ );
+ List<Map.Entry<byte[], byte[]> > ret = new ArrayList<>();
+ IntWritable key = new IntWritable();
+ BytesWritable value = new BytesWritable();
+ while (reader.next(key, value)) {
+ byte[] pcapWithHeader = value.copyBytes();
+ long calculatedTs = PcapHelper.getTimestamp(pcapWithHeader);
+ {
+
+ List<PacketInfo> info = PcapHelper.toPacketInfo(pcapWithHeader);
+ for(PacketInfo pi : info) {
+ Assert.assertEquals(calculatedTs, pi.getPacketTimeInNanos());
+ //IF you are debugging and want to see the packets, uncomment the following.
+ //System.out.println( Long.toUnsignedString(calculatedTs) + " => " + pi.getJsonDoc());
+ }
+ }
+ if(withHeaders) {
+ ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapWithHeader));
+ }
+ else {
+ byte[] pcapRaw = new byte[pcapWithHeader.length - PcapHelper.GLOBAL_HEADER_SIZE - PcapHelper.PACKET_HEADER_SIZE];
+ System.arraycopy(pcapWithHeader, PcapHelper.GLOBAL_HEADER_SIZE + PcapHelper.PACKET_HEADER_SIZE, pcapRaw, 0, pcapRaw.length);
+ ret.add(new AbstractMap.SimpleImmutableEntry<>(Bytes.toBytes(calculatedTs), pcapRaw));
+ }
+ }
+ return Iterables.limit(ret, 2*(ret.size()/2));
+ }
+
+ @Test
+ public void testTimestampInPacket() throws Exception {
+ testTopology(new Function<Properties, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable Properties input) {
+ input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_PACKET.toString());
+ return null;
+ }
+ }, (kafkaComponent, pcapEntries) -> kafkaComponent.writeMessages( KAFKA_TOPIC
+ , Collections2.transform(pcapEntries
+ , input -> input.getValue()
+ )
+ )
+ , true
+ );
+ }
+ @Test
+ public void testTimestampInKey() throws Exception {
+ testTopology(new Function<Properties, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable Properties input) {
+ input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_KEY.toString());
+ return null;
+ }
+ }, new SendEntries() {
+ @Override
+ public void send(KafkaWithZKComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> pcapEntries) throws Exception {
+ Producer<byte[], byte[]> producer = kafkaComponent.createProducer(byte[].class, byte[].class);
+ KafkaUtil.send(producer, pcapEntries, KAFKA_TOPIC, 2);
+ System.out.println("Sent pcap data: " + pcapEntries.size());
+ {
+ int numMessages = 0;
+ ConsumerIterator<?, ?> it = kafkaComponent.getStreamIterator(KAFKA_TOPIC);
+ for (int i = 0; i < pcapEntries.size(); ++i, it.next()) {
+ numMessages++;
+ }
+ Assert.assertEquals(pcapEntries.size(), numMessages);
+ System.out.println("Wrote " + pcapEntries.size() + " to kafka");
+ }
+ }
+ }, false);
+ }
+
+ private static long getTimestamp(int offset, List<Map.Entry<byte[], byte[]>> entries) {
+ return Bytes.toLong(entries.get(offset).getKey());
+ }
+
+ private static interface SendEntries {
+ public void send(KafkaWithZKComponent kafkaComponent, List<Map.Entry<byte[], byte[]>> entries) throws Exception;
+ }
+
+ public void testTopology(Function<Properties, Void> updatePropertiesCallback
+ ,SendEntries sendPcapEntriesCallback
+ ,boolean withHeaders
+ )
+ throws Exception
+ {
+ if (!new File(topologiesDir).exists()) {
+ topologiesDir = UnitTestHelper.findDir("topologies");
+ }
+ targetDir = UnitTestHelper.findDir("target");
+ final File outDir = getOutDir(targetDir);
+ final File queryDir = getQueryDir(targetDir);
+ clearOutDir(outDir);
+ clearOutDir(queryDir);
+
+ File baseDir = new File(new File(targetDir), BASE_DIR);
+ //Assert.assertEquals(0, numFiles(outDir));
+ Assert.assertNotNull(topologiesDir);
+ Assert.assertNotNull(targetDir);
+ Path pcapFile = new Path("../metron-integration-test/src/main/resources/sample/data/SampleInput/PCAPExampleOutput");
+ final List<Map.Entry<byte[], byte[]>> pcapEntries = Lists.newArrayList(readPcaps(pcapFile, withHeaders));
+ Assert.assertTrue(Iterables.size(pcapEntries) > 0);
+ final Properties topologyProperties = new Properties() {{
+ setProperty("spout.kafka.topic.pcap", KAFKA_TOPIC);
+ setProperty("kafka.pcap.start", "BEGINNING");
+ setProperty("kafka.pcap.out", outDir.getAbsolutePath());
+ setProperty("kafka.pcap.numPackets", "2");
+ setProperty("kafka.pcap.maxTimeMS", "200000000");
+ setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
+ }};
+ updatePropertiesCallback.apply(topologyProperties);
+
+ final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
+ add(new KafkaWithZKComponent.Topic(KAFKA_TOPIC, 1));
+ }})
+ .withPostStartCallback(new Function<KafkaWithZKComponent, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable KafkaWithZKComponent kafkaWithZKComponent) {
+
+ topologyProperties.setProperty("kafka.zk", kafkaWithZKComponent.getZookeeperConnect());
+ return null;
+ }
+ }
+ );
+ //.withExistingZookeeper("localhost:2000");
+
+
+ final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath());
+
+ FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
+ .withTopologyLocation(new File(topologiesDir + "/pcap/remote.yaml"))
+ .withTopologyName("pcap")
+ .withTopologyProperties(topologyProperties)
+ .build();
+ UnitTestHelper.verboseLogging();
+ ComponentRunner runner = new ComponentRunner.Builder()
+ .withComponent("mr", mr)
+ .withComponent("kafka", kafkaComponent)
+ .withComponent("storm", fluxComponent)
+ .withMaxTimeMS(-1)
+ .withMillisecondsBetweenAttempts(2000)
+ .withNumRetries(10)
+ .build();
+ try {
+ runner.start();
+ System.out.println("Components started...");
+
+ fluxComponent.submitTopology();
+ sendPcapEntriesCallback.send(kafkaComponent, pcapEntries);
+ runner.process(new Processor<Void>() {
+ @Override
+ public ReadinessState process(ComponentRunner runner) {
+ int numFiles = numFiles(outDir, mr.getConfiguration());
+ int expectedNumFiles = pcapEntries.size() / 2;
+ if (numFiles == expectedNumFiles) {
+ return ReadinessState.READY;
+ } else {
+ return ReadinessState.NOT_READY;
+ }
+ }
+
+ @Override
+ public Void getResult() {
+ return null;
+ }
+ });
+ PcapJob job = new PcapJob();
+ {
+ //Ensure that only two pcaps are returned when we look at 4 and 5
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(4, pcapEntries)
+ , getTimestamp(5, pcapEntries)
+ , new EnumMap<>(Constants.Fields.class)
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ );
+ Assert.assertEquals(results.size(), 2);
+ }
+ {
+ //ensure that none get returned since that destination IP address isn't in the dataset
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(1, pcapEntries)
+ , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.DST_ADDR, "207.28.210.1");
+ }}
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ );
+ Assert.assertEquals(results.size(), 0);
+ }
+ {
+ //same with protocol as before with the destination addr
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(1, pcapEntries)
+ , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.PROTOCOL, "foo");
+ }}
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ );
+ Assert.assertEquals(results.size(), 0);
+ }
+ {
+ //make sure I get them all.
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , new EnumMap<>(Constants.Fields.class)
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ );
+ Assert.assertEquals(results.size(), pcapEntries.size());
+ }
+ {
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.DST_PORT, "22");
+ }}
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ );
+ Assert.assertTrue(results.size() > 0);
+ Assert.assertEquals(results.size()
+ , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && prt.toString().equals("22");
+ }
+ }, withHeaders)
+ )
+ );
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, results);
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
+ System.out.println("Ended");
+ } finally {
+ runner.stop();
+ clearOutDir(outDir);
+ clearOutDir(queryDir);
+ }
+ }
+
+ public static Function<byte[], Iterable<JSONObject>> TO_JSONS = new Function<byte[], Iterable<JSONObject>>() {
+ @Nullable
+ @Override
+ public Iterable<JSONObject> apply(@Nullable byte[] input) {
+ try {
+ return PcapHelper.toJSON(PcapHelper.toPacketInfo(input));
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ };
+
+ private Iterable<JSONObject> filterPcaps(Iterable<Map.Entry<byte[], byte[]>> pcaps
+ ,Predicate<JSONObject> predicate
+ ,boolean withHeaders
+ )
+ {
+ Function<Map.Entry<byte[], byte[]>, byte[]> pcapTransform = null;
+ if(!withHeaders) {
+ final Endianness endianness = Endianness.getNativeEndianness();
+ pcapTransform = kv -> PcapHelper.addGlobalHeader(PcapHelper.addPacketHeader(Bytes.toLong(kv.getKey())
+ , kv.getValue()
+ , endianness
+ )
+ , endianness
+ );
+ }
+ else {
+ pcapTransform = kv -> kv.getValue();
+ }
+ return Iterables.filter(
+ Iterables.concat(
+ Iterables.transform(
+ Iterables.transform(pcaps, pcapTransform)
+ , TO_JSONS
+ )
+ )
+ , predicate
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index 7125382..6a45617 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -99,5 +99,25 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common</artifactId>
+ <version>0.1BETA</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${global_hbase_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java
new file mode 100644
index 0000000..4a6de4d
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/Endianness.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.spout.pcap;
+
+import java.nio.ByteOrder;
+
+public enum Endianness {
+ LITTLE, BIG;
+
+ public static Endianness getNativeEndianness() {
+ if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
+ return BIG;
+ } else {
+ return LITTLE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
new file mode 100644
index 0000000..78f6fd4
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/PcapHelper.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.metron.spout.pcap.Endianness;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.krakenapps.pcap.decoder.ethernet.EthernetType;
+import org.krakenapps.pcap.decoder.ip.IpDecoder;
+import org.krakenapps.pcap.decoder.ip.Ipv4Packet;
+import org.krakenapps.pcap.decoder.tcp.TcpPacket;
+import org.krakenapps.pcap.decoder.udp.UdpPacket;
+import org.krakenapps.pcap.file.GlobalHeader;
+import org.krakenapps.pcap.packet.PacketHeader;
+import org.krakenapps.pcap.packet.PcapPacket;
+import org.krakenapps.pcap.util.Buffer;
+import org.krakenapps.pcap.util.ByteOrderConverter;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+
+import static org.apache.metron.pcap.Constants.*;
+
+public class PcapHelper {
+
+ public static final int PACKET_HEADER_SIZE = 4*Integer.BYTES;
+ public static final int GLOBAL_HEADER_SIZE = 24;
+ private static final Logger LOG = Logger.getLogger(PcapHelper.class);
+ public static ThreadLocal<MetronEthernetDecoder> ETHERNET_DECODER = new ThreadLocal<MetronEthernetDecoder>() {
+ @Override
+ protected MetronEthernetDecoder initialValue() {
+ return createDecoder();
+ }
+ };
+
+ public static Long getTimestamp(String filename) {
+ try {
+ return Long.parseUnsignedLong(Iterables.get(Splitter.on('_').split(filename), 2));
+ }
+ catch(Exception e) {
+ //something went wrong here.
+ return null;
+ }
+ }
+
+ public static String toFilename(String topic, long timestamp, String partition, String uuid)
+ {
+ return Joiner.on("_").join("pcap"
+ ,topic
+ , Long.toUnsignedString(timestamp)
+ ,partition
+ , uuid
+ );
+ }
+
+ public static boolean swapBytes(org.apache.metron.spout.pcap.Endianness endianness) {
+ return endianness == org.apache.metron.spout.pcap.Endianness.LITTLE;
+ }
+
+ public static byte[] getPcapGlobalHeader(Endianness endianness) {
+ if(swapBytes(endianness)) {
+ //swap
+ return new byte[] {
+ (byte) 0xd4, (byte) 0xc3, (byte) 0xb2, (byte) 0xa1 //swapped magic number 0xa1b2c3d4
+ , 0x02, 0x00 //swapped major version 2
+ , 0x04, 0x00 //swapped minor version 4
+ , 0x00, 0x00, 0x00, 0x00 //GMT to local tz offset (= 0)
+ , 0x00, 0x00, 0x00, 0x00 //sigfigs (= 0)
+ , (byte) 0xff, (byte) 0xff, 0x00, 0x00 //snaplen (=65535)
+ , 0x01, 0x00, 0x00, 0x00 // swapped link layer header type (1 = ethernet)
+ };
+ }
+ else {
+ //no need to swap
+ return new byte[] {
+ (byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4 //magic number 0xa1b2c3d4
+ , 0x00, 0x02 //major version 2
+ , 0x00, 0x04 //minor version 4
+ , 0x00, 0x00, 0x00, 0x00 //GMT to local tz offset (= 0)
+ , 0x00, 0x00, 0x00, 0x00 //sigfigs (= 0)
+ , 0x00, 0x00, (byte) 0xff, (byte) 0xff //snaplen (=65535)
+ , 0x00, 0x00, 0x00, 0x01 // link layer header type (1 = ethernet)
+ };
+ }
+ }
+
+ public static Long getTimestamp(byte[] pcap) {
+ PcapByteInputStream pcapByteInputStream = null;
+ try {
+ pcapByteInputStream = new PcapByteInputStream(pcap);
+ PcapPacket packet = pcapByteInputStream.getPacket();
+ GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+ PacketHeader packetHeader = packet.getPacketHeader();
+ if ( globalHeader.getMagicNumber() == 0xa1b2c3d4 || globalHeader.getMagicNumber() == 0xd4c3b2a1 )
+ {
+ //Time is in micro assemble as nano
+ LOG.info("Times are in micro according to the magic number");
+ return packetHeader.getTsSec() * 1000000000L + packetHeader.getTsUsec() * 1000L ;
+ }
+ else if ( globalHeader.getMagicNumber() == 0xa1b23c4d || globalHeader.getMagicNumber() == 0x4d3cb2a1 ) {
+ //Time is in nano assemble as nano
+ LOG.info("Times are in nano according to the magic number");
+ return packetHeader.getTsSec() * 1000000000L + packetHeader.getTsUsec() ;
+ }
+ //Default assume time is in micro assemble as nano
+ LOG.warn("Unknown magic number. Defaulting to micro");
+ return packetHeader.getTsSec() * 1000000000L + packetHeader.getTsUsec() * 1000L ;
+ }
+ catch(IOException ioe) {
+ //we cannot read the packet, so we return null here.
+ LOG.error("Unable to read packet", ioe);
+ }
+ finally {
+ if(pcapByteInputStream != null) {
+ try {
+ pcapByteInputStream.close();
+ } catch (IOException e) {
+ LOG.error("Unable to close stream", e);
+ }
+ }
+ }
+ return null;
+ }
+ public static byte[] addGlobalHeader(byte[] packet, Endianness endianness) {
+ byte[] globalHeader = getPcapGlobalHeader(endianness);
+ byte[] ret = new byte[packet.length + GLOBAL_HEADER_SIZE];
+ int offset = 0;
+ System.arraycopy(globalHeader, 0, ret, offset, GLOBAL_HEADER_SIZE);
+ offset += globalHeader.length;
+ System.arraycopy(packet, 0, ret, offset, packet.length);
+ return ret;
+ }
+
+ public static byte[] addPacketHeader(long tsNano, byte[] packet, Endianness endianness) {
+ boolean swapBytes = swapBytes(endianness);
+ long micros = Long.divideUnsigned(tsNano, 1000);
+ int secs = (int)(micros / 1000000);
+ int usec = (int)(micros % 1000000);
+ int capLen = packet.length;
+ byte[] ret = new byte[PACKET_HEADER_SIZE + packet.length];
+ int offset = 0;
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(secs):secs);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(usec):usec);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ {
+ byte[] b = Bytes.toBytes(swapBytes?ByteOrderConverter.swap(capLen):capLen);
+ System.arraycopy(b, 0, ret, offset, Integer.BYTES);
+ offset += Integer.BYTES;
+ }
+ System.arraycopy(packet, 0, ret, offset, packet.length);
+ return ret;
+ }
+ public static EnumMap<org.apache.metron.common.Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ EnumMap<org.apache.metron.common.Constants.Fields, Object> ret = new EnumMap(org.apache.metron.common.Constants.Fields.class);
+ if(pi.getTcpPacket() != null) {
+ if(pi.getTcpPacket().getSourceAddress() != null) {
+ ret.put(org.apache.metron.common.Constants.Fields.SRC_ADDR, pi.getTcpPacket().getSourceAddress().getHostAddress());
+ }
+ if(pi.getTcpPacket().getSource() != null ) {
+ ret.put(org.apache.metron.common.Constants.Fields.SRC_PORT, pi.getTcpPacket().getSource().getPort());
+ }
+ if(pi.getTcpPacket().getDestinationAddress() != null ) {
+ ret.put(org.apache.metron.common.Constants.Fields.DST_ADDR, pi.getTcpPacket().getDestinationAddress().getHostAddress());
+ }
+ if(pi.getTcpPacket().getDestination() != null ) {
+ ret.put(org.apache.metron.common.Constants.Fields.DST_PORT, pi.getTcpPacket().getDestination().getPort());
+ }
+ if(pi.getIpv4Packet() != null) {
+ ret.put(org.apache.metron.common.Constants.Fields.PROTOCOL, pi.getIpv4Packet().getProtocol());
+ }
+ }
+ return ret;
+ }
+
+ public static List<PacketInfo> toPacketInfo(byte[] packet) throws IOException {
+ return toPacketInfo(ETHERNET_DECODER.get(), packet);
+ }
+ public static MetronEthernetDecoder createDecoder() {
+ MetronEthernetDecoder ethernetDecoder = new MetronEthernetDecoder();
+ IpDecoder ipDecoder = new IpDecoder();
+ ethernetDecoder.register(EthernetType.IPV4, ipDecoder);
+ return ethernetDecoder;
+ }
+
+/**
+ * Parses the.
+ *
+ * @param pcap
+ * the pcap
+ * @return the list * @throws IOException Signals that an I/O exception has
+ * occurred. * @throws IOException * @throws IOException * @throws
+ * IOException
+ * @throws IOException
+ * Signals that an I/O exception has occurred.
+ */
+ public static List<PacketInfo> toPacketInfo(MetronEthernetDecoder decoder, byte[] pcap) throws IOException {
+ List<PacketInfo> packetInfoList = new ArrayList<>();
+
+ PcapByteInputStream pcapByteInputStream = new PcapByteInputStream(pcap);
+
+ GlobalHeader globalHeader = pcapByteInputStream.getGlobalHeader();
+ while (true) {
+ try
+
+ {
+ PcapPacket packet = pcapByteInputStream.getPacket();
+ // int packetCounter = 0;
+ // PacketHeader packetHeader = null;
+ // Ipv4Packet ipv4Packet = null;
+ TcpPacket tcpPacket = null;
+ UdpPacket udpPacket = null;
+ // Buffer packetDataBuffer = null;
+ int sourcePort = 0;
+ int destinationPort = 0;
+
+ // LOG.trace("Got packet # " + ++packetCounter);
+
+ // LOG.trace(packet.getPacketData());
+ decoder.decode(packet);
+
+ PacketHeader packetHeader = packet.getPacketHeader();
+ Ipv4Packet ipv4Packet = Ipv4Packet.parse(packet.getPacketData());
+
+ if (ipv4Packet.getProtocol() == Constants.PROTOCOL_TCP) {
+ tcpPacket = TcpPacket.parse(ipv4Packet);
+
+ }
+
+ if (ipv4Packet.getProtocol() == Constants.PROTOCOL_UDP) {
+
+ Buffer packetDataBuffer = ipv4Packet.getData();
+ sourcePort = packetDataBuffer.getUnsignedShort();
+ destinationPort = packetDataBuffer.getUnsignedShort();
+
+ udpPacket = new UdpPacket(ipv4Packet, sourcePort, destinationPort);
+
+ udpPacket.setLength(packetDataBuffer.getUnsignedShort());
+ udpPacket.setChecksum(packetDataBuffer.getUnsignedShort());
+ packetDataBuffer.discardReadBytes();
+ udpPacket.setData(packetDataBuffer);
+ }
+
+ packetInfoList.add(new PacketInfo(globalHeader, packetHeader, packet,
+ ipv4Packet, tcpPacket, udpPacket));
+ } catch (NegativeArraySizeException ignored) {
+ LOG.debug("Ignorable exception while parsing packet.", ignored);
+ } catch (EOFException eof) { // $codepro.audit.disable logExceptions
+ // Ignore exception and break
+ break;
+ }
+ }
+ return packetInfoList;
+ }
+
+ public static List<JSONObject> toJSON(List<PacketInfo> packetInfoList) {
+ List<JSONObject> messages = new ArrayList<>();
+ for (PacketInfo packetInfo : packetInfoList) {
+ JSONObject message = (JSONObject) JSONValue.parse(packetInfo.getJsonIndexDoc());
+ messages.add(message);
+ }
+ return messages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/28c250d1/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
new file mode 100644
index 0000000..2952a0a
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.mr;
+
+import com.google.common.base.Predicate;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+
+import javax.annotation.Nullable;
+import java.util.EnumMap;
+import java.util.Map;
+
+
+public class PcapFilter implements Predicate<PacketInfo> {
+
+ private String srcAddr;
+ private Integer srcPort;
+ private String dstAddr;
+ private Integer dstPort;
+ private String protocol;
+ private boolean includesReverseTraffic = false;
+
+
+ public PcapFilter(Iterable<Map.Entry<String, String>> config) {
+ for(Map.Entry<String, String> kv : config) {
+ if(kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) {
+ this.dstAddr = kv.getValue();
+ }
+ if(kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) {
+ this.srcAddr = kv.getValue();
+ }
+ if(kv.getKey().equals(Constants.Fields.DST_PORT.getName())) {
+ this.dstPort = Integer.parseInt(kv.getValue());
+ }
+ if(kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) {
+ this.srcPort = Integer.parseInt(kv.getValue());
+ }
+ if(kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) {
+ this.protocol= kv.getValue();
+ }
+ if(kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) {
+ this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue());
+ }
+ }
+ }
+
+ private boolean matchSourceAndDestination(Object srcAddrObj
+ , Object srcPortObj
+ , Object dstAddrObj
+ , Object dstPortObj
+ )
+ {
+ boolean isMatch = true;
+ if(srcAddr != null ) {
+ Object o = srcAddrObj;
+ isMatch &= o != null && o instanceof String && ((String)o).equals(srcAddr);
+ }
+ if(isMatch && srcPort != null ) {
+ Object o = srcPortObj;
+ isMatch &= o != null && o.toString().equals(srcPort.toString());
+ }
+ if(isMatch && dstAddr != null ) {
+ Object o = dstAddrObj;
+ isMatch &= o != null && o instanceof String && ((String)o).equals(dstAddr);
+ }
+ if(isMatch && dstPort != null) {
+ Object o = dstPortObj;
+ isMatch &= o != null && o.toString().equals(dstPort.toString());
+ }
+ return isMatch;
+ }
+
+
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return PcapHelper.packetToFields(pi);
+ }
+
+ @Override
+ public boolean apply(@Nullable PacketInfo pi ) {
+ boolean isMatch = true;
+ EnumMap<Constants.Fields, Object> input= packetToFields(pi);
+ Object srcAddrObj = input.get(Constants.Fields.SRC_ADDR);
+ Object srcPortObj = input.get(Constants.Fields.SRC_PORT);
+ Object dstAddrObj = input.get(Constants.Fields.DST_ADDR);
+ Object dstPortObj = input.get(Constants.Fields.DST_PORT);
+ Object protocolObj = input.get(Constants.Fields.PROTOCOL);
+
+ //first we ensure the protocol matches if you pass one in
+ if(isMatch && protocol != null ) {
+ Object o = protocolObj;
+ isMatch &= o != null && o.toString().equals(protocol);
+ }
+ if(isMatch) {
+ //if we're still a match, then we try to match the source and destination
+ isMatch &= matchSourceAndDestination(srcAddrObj, srcPortObj, dstAddrObj, dstPortObj);
+ if (!isMatch && includesReverseTraffic) {
+ isMatch = true;
+ //then we have to try the other direction if that the forward direction isn't a match
+ isMatch &= matchSourceAndDestination(dstAddrObj, dstPortObj, srcAddrObj, srcPortObj);
+ }
+ }
+ return isMatch;
+ }
+}