You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/19 13:18:19 UTC
[1/2] incubator-metron git commit: METRON-155 Added query filtering
capability for PCAP via Metron REST API (mmiklavcic via cestella) closes
apache/incubator-metron#119
Repository: incubator-metron
Updated Branches:
refs/heads/master df8d682e8 -> 3803df2f3
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
new file mode 100644
index 0000000..ea46cc3
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilter.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.fixed;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.query.VariableResolver;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.PcapFilters;
+import org.apache.metron.pcap.filter.PcapFieldResolver;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+
+public class FixedPcapFilter implements PcapFilter {
+
+ public static class Configurator implements PcapFilterConfigurator<EnumMap<Constants.Fields, String>> {
+ @Override
+ public void addToConfig(EnumMap<Constants.Fields, String> fields, Configuration conf) {
+ for (Map.Entry<Constants.Fields, String> kv : fields.entrySet()) {
+ conf.set(kv.getKey().getName(), kv.getValue());
+ }
+ conf.set(PCAP_FILTER_NAME_CONF, PcapFilters.FIXED.name());
+ }
+
+ @Override
+ public String queryToString(EnumMap<Constants.Fields, String> fields) {
+ return (fields == null ? "" : Joiner.on("_").join(fields.values()));
+ }
+ }
+
+ private String srcAddr;
+ private Integer srcPort;
+ private String dstAddr;
+ private Integer dstPort;
+ private String protocol;
+ private boolean includesReverseTraffic = false;
+
+ @Override
+ public void configure(Iterable<Map.Entry<String, String>> config) {
+ for (Map.Entry<String, String> kv : config) {
+ if (kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) {
+ this.dstAddr = kv.getValue();
+ }
+ if (kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) {
+ this.srcAddr = kv.getValue();
+ }
+ if (kv.getKey().equals(Constants.Fields.DST_PORT.getName())) {
+ this.dstPort = Integer.parseInt(kv.getValue());
+ }
+ if (kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) {
+ this.srcPort = Integer.parseInt(kv.getValue());
+ }
+ if (kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) {
+ this.protocol = kv.getValue();
+ }
+ if (kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) {
+ this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue());
+ }
+ }
+ }
+
+
+ @Override
+ public boolean test(PacketInfo pi) {
+ VariableResolver resolver = new PcapFieldResolver(packetToFields(pi));
+ String srcAddrIn = resolver.resolve(Constants.Fields.SRC_ADDR.getName());
+ String srcPortIn = resolver.resolve(Constants.Fields.SRC_PORT.getName());
+ String dstAddrIn = resolver.resolve(Constants.Fields.DST_ADDR.getName());
+ String dstPortIn = resolver.resolve(Constants.Fields.DST_PORT.getName());
+ String protocolIn = resolver.resolve(Constants.Fields.PROTOCOL.getName());
+
+ if (areMatch(protocol, protocolIn)) {
+ if (matchesSourceAndDestination(srcAddrIn, srcPortIn, dstAddrIn, dstPortIn)) {
+ return true;
+ } else if (includesReverseTraffic) {
+ return matchesReverseSourceAndDestination(srcAddrIn, srcPortIn, dstAddrIn, dstPortIn);
+ }
+ }
+ return false;
+ }
+
+ private boolean areMatch(Integer filter, String input) {
+ return filter == null || areMatch(filter.toString(), input);
+ }
+
+ private boolean areMatch(String filter, String input) {
+ if (filter != null) {
+ return input != null && input.equals(filter);
+ } else {
+ return true;
+ }
+ }
+
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return PcapHelper.packetToFields(pi);
+ }
+
+ private boolean matchesSourceAndDestination(String srcAddrComp,
+ String srcPortComp,
+ String dstAddrComp,
+ String dstPortComp) {
+ boolean isMatch = true;
+ isMatch &= areMatch(this.srcAddr, srcAddrComp);
+ isMatch &= areMatch(this.srcPort, srcPortComp);
+ isMatch &= areMatch(this.dstAddr, dstAddrComp);
+ isMatch &= areMatch(this.dstPort, dstPortComp);
+ return isMatch;
+ }
+
+ private boolean matchesReverseSourceAndDestination(String srcAddr,
+ String srcPort,
+ String dstAddr,
+ String dstPort) {
+ return matchesSourceAndDestination(dstAddr, dstPort, srcAddr, srcPort);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
new file mode 100644
index 0000000..e97ed82
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/query/QueryPcapFilter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.query;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.query.PredicateProcessor;
+import org.apache.metron.common.query.VariableResolver;
+import org.apache.metron.pcap.PacketInfo;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.PcapFieldResolver;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.PcapFilters;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public class QueryPcapFilter implements PcapFilter {
+ public static final String QUERY_STR_CONFIG = "mql";
+
+ public static class Configurator implements PcapFilterConfigurator<String> {
+ @Override
+ public void addToConfig(String query, Configuration conf) {
+ conf.set(QUERY_STR_CONFIG, query);
+ conf.set(PCAP_FILTER_NAME_CONF, PcapFilters.QUERY.name());
+ }
+
+ @Override
+ public String queryToString(String fields) {
+ return (fields == null ? "" :
+ fields.trim().replaceAll("\\s", "_")
+ .replace(".", "-")
+ .replace("'", "")
+ );
+ }
+ }
+
+ private String queryString = null;
+ private PredicateProcessor predicateProcessor = new PredicateProcessor();
+
+ @Override
+ public void configure(Iterable<Map.Entry<String, String>> config) {
+ for (Map.Entry<String, String> entry : config) {
+ if (entry.getKey().equals(QUERY_STR_CONFIG)) {
+ queryString = entry.getValue();
+ }
+ }
+ predicateProcessor.validate(queryString);
+ }
+
+ @Override
+ public boolean test(PacketInfo input) {
+ EnumMap<Constants.Fields, Object> fields = packetToFields(input);
+ VariableResolver resolver = new PcapFieldResolver(fields);
+ return predicateProcessor.parse(queryString, resolver);
+ }
+
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return PcapHelper.packetToFields(pi);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
deleted file mode 100644
index 2952a0a..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapFilter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap.mr;
-
-import com.google.common.base.Predicate;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.PacketInfo;
-import org.apache.metron.pcap.PcapHelper;
-
-import javax.annotation.Nullable;
-import java.util.EnumMap;
-import java.util.Map;
-
-
-public class PcapFilter implements Predicate<PacketInfo> {
-
- private String srcAddr;
- private Integer srcPort;
- private String dstAddr;
- private Integer dstPort;
- private String protocol;
- private boolean includesReverseTraffic = false;
-
-
- public PcapFilter(Iterable<Map.Entry<String, String>> config) {
- for(Map.Entry<String, String> kv : config) {
- if(kv.getKey().equals(Constants.Fields.DST_ADDR.getName())) {
- this.dstAddr = kv.getValue();
- }
- if(kv.getKey().equals(Constants.Fields.SRC_ADDR.getName())) {
- this.srcAddr = kv.getValue();
- }
- if(kv.getKey().equals(Constants.Fields.DST_PORT.getName())) {
- this.dstPort = Integer.parseInt(kv.getValue());
- }
- if(kv.getKey().equals(Constants.Fields.SRC_PORT.getName())) {
- this.srcPort = Integer.parseInt(kv.getValue());
- }
- if(kv.getKey().equals(Constants.Fields.PROTOCOL.getName())) {
- this.protocol= kv.getValue();
- }
- if(kv.getKey().equals(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName())) {
- this.includesReverseTraffic = Boolean.parseBoolean(kv.getValue());
- }
- }
- }
-
- private boolean matchSourceAndDestination(Object srcAddrObj
- , Object srcPortObj
- , Object dstAddrObj
- , Object dstPortObj
- )
- {
- boolean isMatch = true;
- if(srcAddr != null ) {
- Object o = srcAddrObj;
- isMatch &= o != null && o instanceof String && ((String)o).equals(srcAddr);
- }
- if(isMatch && srcPort != null ) {
- Object o = srcPortObj;
- isMatch &= o != null && o.toString().equals(srcPort.toString());
- }
- if(isMatch && dstAddr != null ) {
- Object o = dstAddrObj;
- isMatch &= o != null && o instanceof String && ((String)o).equals(dstAddr);
- }
- if(isMatch && dstPort != null) {
- Object o = dstPortObj;
- isMatch &= o != null && o.toString().equals(dstPort.toString());
- }
- return isMatch;
- }
-
-
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return PcapHelper.packetToFields(pi);
- }
-
- @Override
- public boolean apply(@Nullable PacketInfo pi ) {
- boolean isMatch = true;
- EnumMap<Constants.Fields, Object> input= packetToFields(pi);
- Object srcAddrObj = input.get(Constants.Fields.SRC_ADDR);
- Object srcPortObj = input.get(Constants.Fields.SRC_PORT);
- Object dstAddrObj = input.get(Constants.Fields.DST_ADDR);
- Object dstPortObj = input.get(Constants.Fields.DST_PORT);
- Object protocolObj = input.get(Constants.Fields.PROTOCOL);
-
- //first we ensure the protocol matches if you pass one in
- if(isMatch && protocol != null ) {
- Object o = protocolObj;
- isMatch &= o != null && o.toString().equals(protocol);
- }
- if(isMatch) {
- //if we're still a match, then we try to match the source and destination
- isMatch &= matchSourceAndDestination(srcAddrObj, srcPortObj, dstAddrObj, dstPortObj);
- if (!isMatch && includesReverseTraffic) {
- isMatch = true;
- //then we have to try the other direction if that the forward direction isn't a match
- isMatch &= matchSourceAndDestination(dstAddrObj, dstPortObj, srcAddrObj, srcPortObj);
- }
- }
- return isMatch;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 0dc6dc9..3543b1d 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -18,12 +18,12 @@
package org.apache.metron.pcap.mr;
-import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -33,41 +33,53 @@ import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.log4j.Logger;
-import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
+import org.apache.metron.pcap.filter.PcapFilters;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.stream.Stream;
public class PcapJob {
private static final Logger LOG = Logger.getLogger(PcapJob.class);
-
public static class PcapMapper extends Mapper<LongWritable, BytesWritable, LongWritable, BytesWritable> {
public static final String START_TS_CONF = "start_ts";
public static final String END_TS_CONF = "end_ts";
PcapFilter filter;
long start;
long end;
+
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- filter = new PcapFilter(context.getConfiguration());
+ filter = PcapFilters.valueOf(context.getConfiguration().get(PcapFilterConfigurator.PCAP_FILTER_NAME_CONF)).create();
+ filter.configure(context.getConfiguration());
start = Long.parseUnsignedLong(context.getConfiguration().get(START_TS_CONF));
end = Long.parseUnsignedLong(context.getConfiguration().get(END_TS_CONF));
}
@Override
protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
- if(Long.compareUnsigned(key.get() ,start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) {
- boolean send = Iterables.size(Iterables.filter(PcapHelper.toPacketInfo(value.copyBytes()), filter)) > 0;
+ if (Long.compareUnsigned(key.get(), start) >= 0 && Long.compareUnsigned(key.get(), end) <= 0) {
+ // It is assumed that the passed BytesWritable value is always a *single* PacketInfo object. Passing more than 1
+ // object will result in the whole set being passed through if any pass the filter. We cannot serialize PacketInfo
+ // objects back to byte arrays, otherwise we could support more than one packet.
+ // Note: short-circuit findAny() func on stream
+ boolean send = filteredPacketInfo(value).findAny().isPresent();
if (send) {
context.write(key, value);
}
}
}
+
+ private Stream<PacketInfo> filteredPacketInfo(BytesWritable value) throws IOException {
+ return PcapHelper.toPacketInfo(value.copyBytes()).stream().filter(filter);
+ }
}
public static class PcapReducer extends Reducer<LongWritable, BytesWritable, LongWritable, BytesWritable> {
@@ -154,26 +166,23 @@ public class PcapJob {
return ret;
}
- private static String queryToString(EnumMap<Constants.Fields, String> fields) {
- return Joiner.on("_").join(fields.values());
- }
-
- public List<byte[]> query(Path basePath
+ public <T> List<byte[]> query(Path basePath
, Path baseOutputPath
, long beginNS
, long endNS
- , EnumMap<Constants.Fields, String> fields
+ , T fields
, Configuration conf
, FileSystem fs
+ , PcapFilterConfigurator<T> filterImpl
) throws IOException, ClassNotFoundException, InterruptedException {
- String fileName = Joiner.on("_").join(beginNS, endNS, queryToString(fields), UUID.randomUUID().toString());
+ String fileName = Joiner.on("_").join(beginNS, endNS, filterImpl.queryToString(fields), UUID.randomUUID().toString());
if(LOG.isDebugEnabled()) {
DateFormat format = SimpleDateFormat.getDateTimeInstance( SimpleDateFormat.LONG
, SimpleDateFormat.LONG
);
String from = format.format(new Date(Long.divideUnsigned(beginNS, 1000000)));
String to = format.format(new Date(Long.divideUnsigned(endNS, 1000000)));
- LOG.debug("Executing query " + queryToString(fields) + " on timerange " + from + " to " + to);
+ LOG.debug("Executing query " + filterImpl.queryToString(fields) + " on timerange " + from + " to " + to);
}
Path outputPath = new Path(baseOutputPath, fileName);
Job job = createJob( basePath
@@ -183,6 +192,7 @@ public class PcapJob {
, fields
, conf
, fs
+ , filterImpl
);
boolean completed = job.waitForCompletion(true);
if(completed) {
@@ -194,24 +204,21 @@ public class PcapJob {
}
- public static void addToConfig(EnumMap<Constants.Fields, String> fields, Configuration conf) {
- for(Map.Entry<Constants.Fields, String> kv : fields.entrySet()) {
- conf.set(kv.getKey().getName(), kv.getValue());
- }
- }
- public Job createJob( Path basePath
+
+ public <T> Job createJob( Path basePath
, Path outputPath
, long beginNS
, long endNS
- , EnumMap<Constants.Fields, String> fields
+ , T fields
, Configuration conf
, FileSystem fs
+ , PcapFilterConfigurator<T> filterImpl
) throws IOException
{
conf.set(PcapMapper.START_TS_CONF, Long.toUnsignedString(beginNS));
conf.set(PcapMapper.END_TS_CONF, Long.toUnsignedString(endNS));
- addToConfig(fields, conf);
+ filterImpl.addToConfig(fields, conf);
Job job = new Job(conf);
job.setJarByClass(PcapJob.class);
job.setMapperClass(PcapJob.PcapMapper.class);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java
new file mode 100644
index 0000000..75871e9
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/PcapFiltersTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+
+public class PcapFiltersTest {
+
+ @Test
+ public void creates_pcap_filters() throws Exception {
+ Assert.assertThat("filter type should be Fixed", PcapFilters.FIXED.create(), instanceOf(FixedPcapFilter.class));
+ Assert.assertThat("filter type should be Query", PcapFilters.QUERY.create(), instanceOf(QueryPcapFilter.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
new file mode 100644
index 0000000..b75b9c8
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/fixed/FixedPcapFilterTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.fixed;
+
+import org.apache.metron.common.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class FixedPcapFilterTest {
+
+ @Test
+ public void string_representation_of_query_gets_formatted() throws Exception {
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ String actual = new FixedPcapFilter.Configurator().queryToString(fields);
+ String expected = "src_ip_0_dst_ip_1_false";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+
+ @Test
+ public void string_representation_of_empty_fields_empty() throws Exception {
+ {
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class);
+ String actual = new FixedPcapFilter.Configurator().queryToString(fields);
+ String expected = "";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+ {
+ String actual = new FixedPcapFilter.Configurator().queryToString(null);
+ String expected = "";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+ {
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "");
+ put(Constants.Fields.SRC_PORT, "");
+ }};
+ String actual = new FixedPcapFilter.Configurator().queryToString(fields);
+ String expected = "_";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
new file mode 100644
index 0000000..061066e
--- /dev/null
+++ b/metron-platform/metron-pcap/src/test/java/org/apache/metron/pcap/filter/query/QueryPcapFilterTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter.query;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+public class QueryPcapFilterTest {
+
+ @Test
+ public void string_representation_of_query_gets_formatted() throws Exception {
+ String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+ String actual = new QueryPcapFilter.Configurator().queryToString(query);
+ String expected = "ip_src_addr_==_srcIp_and_ip_src_port_==_80_and_ip_dst_addr_==_dstIp_and_ip_dst_port_==_100_and_protocol_==_protocol";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+
+ @Test
+ public void string_representation_of_empty_query_empty() throws Exception {
+ {
+ String query = "";
+ String actual = new QueryPcapFilter.Configurator().queryToString(query);
+ String expected = "";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+ {
+ String query = " ";
+ String actual = new QueryPcapFilter.Configurator().queryToString(query);
+ String expected = "";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+ {
+ String query = null;
+ String actual = new QueryPcapFilter.Configurator().queryToString(query);
+ String expected = "";
+ Assert.assertThat("string representation did not match", actual, equalTo(expected));
+ }
+ }
+
+}
[2/2] incubator-metron git commit: METRON-155 Added query filtering
capability for PCAP via Metron REST API (mmiklavcic via cestella) closes
apache/incubator-metron#119
Posted by ce...@apache.org.
METRON-155 Added query filtering capability for PCAP via Metron REST API (mmiklavcic via cestella) closes apache/incubator-metron#119
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/3803df2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/3803df2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/3803df2f
Branch: refs/heads/master
Commit: 3803df2f319738c28b1992a3a9206f5eb65ab6c0
Parents: df8d682
Author: mmiklavc <mi...@gmail.com>
Authored: Thu May 19 09:18:09 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu May 19 09:18:09 2016 -0400
----------------------------------------------------------------------
.../pcapservice/PcapReceiverImplRestEasy.java | 115 ++++++--
.../PcapReceiverImplRestEasyTest.java | 192 ++++++++-----
.../org/apache/metron/common/Constants.java | 24 +-
.../java/org/apache/metron/common/Creator.java | 22 ++
.../metron/common/query/PredicateProcessor.java | 12 +-
.../metron/common/query/QueryParserTest.java | 13 +-
.../apache/metron/pcap/FixedPcapFilterTest.java | 285 +++++++++++++++++++
.../org/apache/metron/pcap/PcapFilterTest.java | 269 -----------------
.../apache/metron/pcap/QueryPcapFilterTest.java | 227 +++++++++++++++
.../PcapTopologyIntegrationTest.java | 103 ++++++-
.../metron/pcap/filter/PcapFieldResolver.java | 42 +++
.../apache/metron/pcap/filter/PcapFilter.java | 28 ++
.../pcap/filter/PcapFilterConfigurator.java | 27 ++
.../apache/metron/pcap/filter/PcapFilters.java | 49 ++++
.../pcap/filter/fixed/FixedPcapFilter.java | 139 +++++++++
.../pcap/filter/query/QueryPcapFilter.java | 78 +++++
.../org/apache/metron/pcap/mr/PcapFilter.java | 121 --------
.../java/org/apache/metron/pcap/mr/PcapJob.java | 59 ++--
.../metron/pcap/filter/PcapFiltersTest.java | 35 +++
.../pcap/filter/fixed/FixedPcapFilterTest.java | 69 +++++
.../pcap/filter/query/QueryPcapFilterTest.java | 58 ++++
21 files changed, 1444 insertions(+), 523 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
index 1d0beb8..9510318 100644
--- a/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
+++ b/metron-platform/metron-api/src/main/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasy.java
@@ -17,29 +17,29 @@
*/
package org.apache.metron.pcapservice;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.List;
-
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
-
-import com.google.common.annotations.VisibleForTesting;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
import org.apache.metron.pcap.mr.PcapJob;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.List;
+
@Path("/")
public class PcapReceiverImplRestEasy {
@@ -98,18 +98,84 @@ public class PcapReceiverImplRestEasy {
return false;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * com.cisco.opensoc.hbase.client.IPcapReceiver#getPcapsByIdentifiers(java.lang
- * .String, java.lang.String, java.lang.String, java.lang.String,
- * java.lang.String, long, long, boolean,
- * javax.servlet.http.HttpServletResponse)
- */
+ /**
+ * Enable filtering PCAP results by query filter string and start/end packet TS
+ *
+ * @param query Filter results based on this query
+ * @param startTime Only return packets originating after this start time
+ * @param endTime Only return packets originating before this end time
+ * @param servlet_response
+ * @return REST response
+ * @throws IOException
+ */
@GET
- @Path("/pcapGetter/getPcapsByIdentifiers")
+ @Path("/pcapGetter/getPcapsByQuery")
+ public Response getPcapsByIdentifiers(
+ @QueryParam ("query") String query,
+ @DefaultValue("-1") @QueryParam ("startTime")long startTime,
+ @DefaultValue("-1") @QueryParam ("endTime")long endTime,
+ @Context HttpServletResponse servlet_response)
+ throws IOException {
+ PcapsResponse response = new PcapsResponse();
+ try {
+ if (startTime < 0) {
+ startTime = 0L;
+ }
+ if (endTime < 0) {
+ endTime = System.currentTimeMillis();
+ }
+ if(query == null) {
+ return Response.serverError().status(Response.Status.NO_CONTENT)
+ .entity("Query is null").build();
+ }
+ //convert to nanoseconds since the epoch
+ startTime = TimestampConverters.MILLISECONDS.toNanoseconds(startTime);
+ endTime = TimestampConverters.MILLISECONDS.toNanoseconds(endTime);
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Query received: " + query);
+ }
+ response.setPcaps(getQueryUtil().query(new org.apache.hadoop.fs.Path(ConfigurationUtil.getPcapOutputPath())
+ , new org.apache.hadoop.fs.Path(ConfigurationUtil.getTempQueryOutputPath())
+ , startTime
+ , endTime
+ , query
+ , CONFIGURATION.get()
+ , FileSystem.get(CONFIGURATION.get())
+ , new QueryPcapFilter.Configurator()
+ )
+ );
+
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred while fetching Pcaps by identifiers :",
+ e);
+ throw new WebApplicationException("Unable to fetch Pcaps via MR job", e);
+ }
+
+ // return http status '200 OK' along with the complete pcaps response file,
+ // and headers
+ return Response
+ .ok(response.getPcaps(), MediaType.APPLICATION_OCTET_STREAM)
+ .status(200).build();
+ }
+
+ /**
+ * Enable filtering PCAP results by fixed properties and start/end packet TS
+ *
+ * @param srcIp filter value
+ * @param dstIp filter value
+ * @param protocol filter value
+ * @param srcPort filter value
+ * @param dstPort filter value
+ * @param startTime filter value
+ * @param endTime filter value
+ * @param includeReverseTraffic Indicates if filter should check swapped src/dest addresses and IPs
+ * @param servlet_response
+ * @return REST response
+ * @throws IOException
+ */
+ @GET
+ @Path("/pcapGetter/getPcapsByIdentifiers")
public Response getPcapsByIdentifiers(
@QueryParam ("srcIp") String srcIp,
@QueryParam ("dstIp") String dstIp,
@@ -174,6 +240,7 @@ public class PcapReceiverImplRestEasy {
, query
, CONFIGURATION.get()
, FileSystem.get(CONFIGURATION.get())
+ , new FixedPcapFilter.Configurator()
)
);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
index eab4998..1793b06 100644
--- a/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
+++ b/metron-platform/metron-api/src/test/java/org/apache/metron/pcapservice/PcapReceiverImplRestEasyTest.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.mr.PcapJob;
import org.junit.Assert;
import org.junit.Before;
@@ -33,33 +34,43 @@ import java.util.EnumMap;
import java.util.List;
public class PcapReceiverImplRestEasyTest {
- public static class MockQueryHandler extends PcapJob {
+
+ public static class MockQueryHandler<R> extends PcapJob {
Path basePath;
Path baseOutputPath;
long beginNS;
long endNS;
- EnumMap<Constants.Fields, String> fields;
+ R fields;
+ PcapFilterConfigurator<R> filterImpl;
+
@Override
- public List<byte[]> query( Path basePath
+ public <T> List<byte[]> query( Path basePath
, Path baseOutputPath
, long beginNS
, long endNS
- , EnumMap<Constants.Fields, String> fields
+ , T fields
, Configuration conf
, FileSystem fs
+ , PcapFilterConfigurator<T> filterImpl
) throws IOException, ClassNotFoundException, InterruptedException
{
this.basePath = basePath;
this.baseOutputPath = baseOutputPath;
this.beginNS = beginNS;
this.endNS = endNS;
- this.fields = fields;
+ this.fields = (R) fields;
+ this.filterImpl = (PcapFilterConfigurator<R>) filterImpl;
return null;
}
}
- final MockQueryHandler queryHandler = new MockQueryHandler();
- PcapReceiverImplRestEasy restEndpoint = new PcapReceiverImplRestEasy() {{
- this.queryUtil = queryHandler;
+
+ final MockQueryHandler<EnumMap<Constants.Fields, String>> fixedQueryHandler = new MockQueryHandler<EnumMap<Constants.Fields, String>>();
+ final MockQueryHandler<String> queryQueryHandler = new MockQueryHandler<String>();
+ PcapReceiverImplRestEasy fixedRestEndpoint = new PcapReceiverImplRestEasy() {{
+ this.queryUtil = fixedQueryHandler;
+ }};
+ PcapReceiverImplRestEasy queryRestEndpoint = new PcapReceiverImplRestEasy() {{
+ this.queryUtil = queryQueryHandler;
}};
@Before
@@ -69,7 +80,7 @@ public class PcapReceiverImplRestEasyTest {
}
@Test
- public void testNormalPath() throws Exception {
+ public void testNormalFixedPath() throws Exception {
String srcIp = "srcIp";
String dstIp = "dstIp";
String protocol = "protocol";
@@ -80,33 +91,46 @@ public class PcapReceiverImplRestEasyTest {
{
boolean includeReverseTraffic = false;
- restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
- Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
- Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
- Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
- Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
- Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
- Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+ Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+ Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+ Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+ Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+ Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
{
boolean includeReverseTraffic = true;
- restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
- Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
- Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
- Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
- Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
- Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
- Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+ Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+ Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+ Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+ Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+ Assert.assertEquals(includeReverseTraffic, Boolean.parseBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
}
@Test
+ public void testNormalQueryPath() throws Exception {
+ long startTime = 100;
+ long endTime = 1000;
+ String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+ queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
+ Assert.assertEquals(query, queryQueryHandler.fields);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryQueryHandler.endNS);
+ }
+
+ @Test
public void testNullSrcIp() throws Exception {
String srcIp = null;
String dstIp = "dstIp";
@@ -116,16 +140,16 @@ public class PcapReceiverImplRestEasyTest {
long startTime = 100;
long endTime = 1000;
boolean includeReverseTraffic = false;
- restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
- Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
- Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
- Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
- Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
- Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
- Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+ Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+ Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+ Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+ Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+ Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
@Test
@@ -138,60 +162,82 @@ public class PcapReceiverImplRestEasyTest {
long startTime = 100;
long endTime = 1000;
boolean includeReverseTraffic = false;
- restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
- Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
- Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
- Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
- Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
- Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
- Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), queryHandler.beginNS);
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+ Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+ Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+ Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+ Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(startTime), fixedQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+ Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
}
@Test
public void testEmptyStartTime() throws Exception {
String srcIp = "srcIp";
- String dstIp = null;
+ String dstIp = "dstIp";
String protocol = "protocol";
String srcPort = "80";
String dstPort = "100";
long startTime = -1;
long endTime = 1000;
- boolean includeReverseTraffic = false;
- restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
- Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
- Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
- Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
- Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
- Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
- Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
- Assert.assertEquals(0, queryHandler.beginNS);
- Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryHandler.endNS);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ {
+ boolean includeReverseTraffic = false;
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+ Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+ Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+ Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+ Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+ Assert.assertEquals(0, fixedQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), fixedQueryHandler.endNS);
+ Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ }
+ {
+ String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+ queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
+ Assert.assertEquals(query, queryQueryHandler.fields);
+ Assert.assertEquals(0, queryQueryHandler.beginNS);
+ Assert.assertEquals(TimestampConverters.MILLISECONDS.toNanoseconds(endTime), queryQueryHandler.endNS);
+ }
}
@Test
public void testEmptyEndTime() throws Exception {
String srcIp = "srcIp";
- String dstIp = null;
+ String dstIp = "dstIp";
String protocol = "protocol";
String srcPort = "80";
String dstPort = "100";
long startTime = -1;
long endTime = -1;
- boolean includeReverseTraffic = false;
- restEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
- Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryHandler.basePath);
- Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryHandler.baseOutputPath);
- Assert.assertEquals(srcIp, queryHandler.fields.get(Constants.Fields.SRC_ADDR));
- Assert.assertEquals(dstIp, queryHandler.fields.get(Constants.Fields.DST_ADDR));
- Assert.assertEquals(srcPort, queryHandler.fields.get(Constants.Fields.SRC_PORT));
- Assert.assertEquals(dstPort, queryHandler.fields.get(Constants.Fields.DST_PORT));
- Assert.assertEquals(0, queryHandler.beginNS);
- Assert.assertTrue(queryHandler.endNS > 0);
- Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(queryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ {
+ boolean includeReverseTraffic = false;
+ fixedRestEndpoint.getPcapsByIdentifiers(srcIp, dstIp, protocol, srcPort, dstPort, startTime, endTime, includeReverseTraffic, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), fixedQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), fixedQueryHandler.baseOutputPath);
+ Assert.assertEquals(srcIp, fixedQueryHandler.fields.get(Constants.Fields.SRC_ADDR));
+ Assert.assertEquals(dstIp, fixedQueryHandler.fields.get(Constants.Fields.DST_ADDR));
+ Assert.assertEquals(srcPort, fixedQueryHandler.fields.get(Constants.Fields.SRC_PORT));
+ Assert.assertEquals(dstPort, fixedQueryHandler.fields.get(Constants.Fields.DST_PORT));
+ Assert.assertEquals(0, fixedQueryHandler.beginNS);
+ Assert.assertTrue(fixedQueryHandler.endNS > 0);
+ Assert.assertEquals(includeReverseTraffic, Boolean.getBoolean(fixedQueryHandler.fields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC)));
+ }
+ {
+ String query = "ip_src_addr == 'srcIp' and ip_src_port == '80' and ip_dst_addr == 'dstIp' and ip_dst_port == '100' and protocol == 'protocol'";
+ queryRestEndpoint.getPcapsByIdentifiers(query, startTime, endTime, null);
+ Assert.assertEquals(new Path(ConfigurationUtil.getPcapOutputPath()), queryQueryHandler.basePath);
+ Assert.assertEquals(new Path(ConfigurationUtil.getTempQueryOutputPath()), queryQueryHandler.baseOutputPath);
+ Assert.assertEquals(query, queryQueryHandler.fields);
+ Assert.assertEquals(0, queryQueryHandler.beginNS);
+ Assert.assertTrue(queryQueryHandler.endNS > 0);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index 60a5b51..7e791d5 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -17,6 +17,9 @@
*/
package org.apache.metron.common;
+import java.util.HashMap;
+import java.util.Map;
+
public class Constants {
public static final String ZOOKEEPER_ROOT = "/metron";
@@ -25,9 +28,11 @@ public class Constants {
public static final String SENSOR_TYPE = "source.type";
public static final String ENRICHMENT_TOPIC = "enrichments";
public static final String ERROR_STREAM = "error";
+ public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
+ public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
public static enum Fields {
- SRC_ADDR("ip_src_addr")
+ SRC_ADDR("ip_src_addr")
,SRC_PORT("ip_src_port")
,DST_ADDR("ip_dst_addr")
,DST_PORT("ip_dst_port")
@@ -35,16 +40,29 @@ public class Constants {
,TIMESTAMP("timestamp")
,INCLUDES_REVERSE_TRAFFIC("includes_reverse_traffic")
;
+ private static Map<String, Fields> nameToField;
+
+ static {
+ nameToField = new HashMap<>();
+ for (Fields f : Fields.values()) {
+ nameToField.put(f.getName(), f);
+ }
+ }
+
private String name;
+
Fields(String name) {
this.name = name;
}
+
public String getName() {
return name;
}
+
+ public static Fields fromString(String fieldName) {
+ return nameToField.get(fieldName);
+ }
}
- public static final String SIMPLE_HBASE_ENRICHMENT = "hbaseEnrichment";
- public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java
new file mode 100644
index 0000000..b813dc5
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Creator.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common;
+
+public interface Creator<T> {
+ T create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
index 6499319..26e4da8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/query/PredicateProcessor.java
@@ -18,12 +18,20 @@
package org.apache.metron.common.query;
-import org.antlr.v4.runtime.*;
-import org.apache.metron.common.query.generated.*;
+import org.antlr.v4.runtime.ANTLRInputStream;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStream;
+import org.apache.metron.common.query.generated.PredicateLexer;
+import org.apache.metron.common.query.generated.PredicateParser;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
public class PredicateProcessor {
public boolean parse(String rule, VariableResolver resolver) {
+ if (rule == null || isEmpty(rule.trim())) {
+ return true;
+ }
ANTLRInputStream input = new ANTLRInputStream(rule);
PredicateLexer lexer = new PredicateLexer(input);
lexer.removeErrorListeners();
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
index ad798e2..4660002 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/query/QueryParserTest.java
@@ -31,14 +31,14 @@ public class QueryParserTest {
PredicateProcessor processor = new PredicateProcessor();
try {
processor.validate("'foo'");
- Assert.fail("Invalid rule found to be valid.");
+ Assert.fail("Invalid rule found to be valid - lone value.");
}
catch(ParseException e) {
}
try {
processor.validate("enrichedField1 == 'enrichedValue1");
- Assert.fail("Invalid rule found to be valid.");
+ Assert.fail("Invalid rule found to be valid - unclosed single quotes.");
}
catch(ParseException e) {
@@ -67,6 +67,9 @@ public class QueryParserTest {
Assert.assertTrue(run("foo== foo", v -> variableMap.get(v)));
Assert.assertTrue(run("empty== ''", v -> variableMap.get(v)));
Assert.assertTrue(run("spaced == 'metron is great'", v -> variableMap.get(v)));
+ Assert.assertTrue(run(null, v -> variableMap.get(v)));
+ Assert.assertTrue(run("", v -> variableMap.get(v)));
+ Assert.assertTrue(run(" ", v -> variableMap.get(v)));
}
@Test
@@ -82,7 +85,10 @@ public class QueryParserTest {
Assert.assertFalse(run("('casey' == foo) and (FALSE == TRUE)", v -> variableMap.get(v)));
Assert.assertFalse(run("'casey' == foo and FALSE", v -> variableMap.get(v)));
Assert.assertTrue(run("'casey' == foo and true", v -> variableMap.get(v)));
+ Assert.assertTrue(run("true", v -> variableMap.get(v)));
+ Assert.assertTrue(run("TRUE", v -> variableMap.get(v)));
}
+
@Test
public void testList() throws Exception {
final Map<String, String> variableMap = new HashMap<String, String>() {{
@@ -98,6 +104,7 @@ public class QueryParserTest {
Assert.assertFalse(run("foo not in [ 'casey', 'david' ]", v -> variableMap.get(v)));
Assert.assertFalse(run("foo not in [ 'casey', 'david' ] and 'casey' == foo", v -> variableMap.get(v)));
}
+
@Test
public void testExists() throws Exception {
final Map<String, String> variableMap = new HashMap<String, String>() {{
@@ -123,6 +130,7 @@ public class QueryParserTest {
Assert.assertTrue(run("TO_UPPER(foo) in [ TO_UPPER('casey'), 'david' ] and IN_SUBNET(ip, '192.168.0.0/24')", v -> variableMap.get(v)));
Assert.assertFalse(run("TO_LOWER(foo) in [ TO_UPPER('casey'), 'david' ]", v -> variableMap.get(v)));
}
+
@Test
public void testLogicalFunctions() throws Exception {
final Map<String, String> variableMap = new HashMap<String, String>() {{
@@ -149,4 +157,5 @@ public class QueryParserTest {
Assert.assertFalse(run("IN_SUBNET(ip_dst_addr, '192.168.0.0/24')", v-> variableMap.get(v)));
Assert.assertTrue(run("not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))", v-> variableMap.get(v)));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
new file mode 100644
index 0000000..218d143
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/FixedPcapFilterTest.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+public class FixedPcapFilterTest {
+ @Test
+ public void testTrivialEquality() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testReverseTraffic() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "dst_ip");
+ put(Constants.Fields.SRC_PORT, 1);
+ put(Constants.Fields.DST_ADDR, "src_ip");
+ put(Constants.Fields.DST_PORT, 0);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "dst_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "src_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+@Test
+public void testMissingDstAddr() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip1");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+}
+ @Test
+ public void testMissingDstPort() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 100);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+ @Test
+ public void testMissingSrcAddr() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_PORT, "0");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+ @Test
+ public void testMissingSrcPort() throws Exception {
+ Configuration config = new Configuration();
+ final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, "1");
+ put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
+ }};
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new FixedPcapFilter.Configurator().addToConfig(fields, config);
+ {
+ FixedPcapFilter filter = new FixedPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 100);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
deleted file mode 100644
index 70c973a..0000000
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapFilterTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.pcap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.*;
-import org.apache.metron.common.Constants;
-import org.apache.metron.pcap.mr.PcapFilter;
-import org.apache.metron.pcap.mr.PcapJob;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.EnumMap;
-
-public class PcapFilterTest {
- @Test
- public void testTrivialEquality() throws Exception {
- Configuration config = new Configuration();
- final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, "0");
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
- }};
- PcapJob.addToConfig(fields, config);
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- }
-
- @Test
- public void testReverseTraffic() throws Exception {
- Configuration config = new Configuration();
- final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, "0");
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "true");
- }};
- PcapJob.addToConfig(fields, config);
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "dst_ip");
- put(Constants.Fields.SRC_PORT, 1);
- put(Constants.Fields.DST_ADDR, "src_ip");
- put(Constants.Fields.DST_PORT, 0);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "dst_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "src_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertFalse(filter.apply(null));
- }
- }
-@Test
-public void testMissingDstAddr() throws Exception {
- Configuration config = new Configuration();
- final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, "0");
- put(Constants.Fields.DST_PORT, "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
- }};
- PcapJob.addToConfig(fields, config);
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip1");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertFalse(filter.apply(null));
- }
-}
- @Test
- public void testMissingDstPort() throws Exception {
- Configuration config = new Configuration();
- final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, "0");
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
- }};
- PcapJob.addToConfig(fields, config);
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 100);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 100);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 100);
- }};
- }
- };
- Assert.assertFalse(filter.apply(null));
- }
- }
- @Test
- public void testMissingSrcAddr() throws Exception {
- Configuration config = new Configuration();
- final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_PORT, "0");
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
- }};
- PcapJob.addToConfig(fields, config);
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- }
- @Test
- public void testMissingSrcPort() throws Exception {
- Configuration config = new Configuration();
- final EnumMap<Constants.Fields, String> fields = new EnumMap<Constants.Fields, String>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, "1");
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC, "false");
- }};
- PcapJob.addToConfig(fields, config);
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 0);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- {
- PcapFilter filter = new PcapFilter(config) {
- @Override
- protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
- return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
- put(Constants.Fields.SRC_ADDR, "src_ip");
- put(Constants.Fields.SRC_PORT, 100);
- put(Constants.Fields.DST_ADDR, "dst_ip");
- put(Constants.Fields.DST_PORT, 1);
- }};
- }
- };
- Assert.assertTrue(filter.apply(null));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
new file mode 100644
index 0000000..f07dc48
--- /dev/null
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/QueryPcapFilterTest.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.filter.PcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.EnumMap;
+
+public class QueryPcapFilterTest {
+
+ @Test
+ public void testEmptyQueryFilter() throws Exception {
+ Configuration config = new Configuration();
+ String query = "";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ PcapFilter filter = new QueryPcapFilter() {
+
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testTrivialEquality() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ PcapFilter filter = new QueryPcapFilter() {
+
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingDstAddr() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_port == '1'";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip1");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingDstPort() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_src_port == '0' and ip_dst_addr == 'dst_ip'";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 100);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 100);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertFalse(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingSrcAddr() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_port == '0' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+
+ @Test
+ public void testMissingSrcPort() throws Exception {
+ Configuration config = new Configuration();
+ String query = "ip_src_addr == 'src_ip' and ip_dst_addr == 'dst_ip' and ip_dst_port == '1'";
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 0);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ new QueryPcapFilter.Configurator().addToConfig(query, config);
+ {
+ QueryPcapFilter filter = new QueryPcapFilter() {
+ @Override
+ protected EnumMap<Constants.Fields, Object> packetToFields(PacketInfo pi) {
+ return new EnumMap<Constants.Fields, Object>(Constants.Fields.class) {{
+ put(Constants.Fields.SRC_ADDR, "src_ip");
+ put(Constants.Fields.SRC_PORT, 100);
+ put(Constants.Fields.DST_ADDR, "dst_ip");
+ put(Constants.Fields.DST_PORT, 1);
+ }};
+ }
+ };
+ filter.configure(config);
+ Assert.assertTrue(filter.test(null));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 1142da6..03e3639 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -25,7 +25,8 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import kafka.consumer.ConsumerIterator;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
@@ -42,6 +43,8 @@ import org.apache.metron.integration.utils.KafkaUtil;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.PcapMerger;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
import org.apache.metron.pcap.mr.PcapJob;
import org.apache.metron.spout.pcap.Endianness;
import org.apache.metron.spout.pcap.scheme.TimestampScheme;
@@ -51,7 +54,10 @@ import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
import java.util.*;
public class PcapTopologyIntegrationTest {
@@ -221,7 +227,6 @@ public class PcapTopologyIntegrationTest {
}
}
);
- //.withExistingZookeeper("localhost:2000");
final MRComponent mr = new MRComponent().withBasePath(baseDir.getAbsolutePath());
@@ -274,6 +279,22 @@ public class PcapTopologyIntegrationTest {
, new EnumMap<>(Constants.Fields.class)
, new Configuration()
, FileSystem.get(new Configuration())
+ , new FixedPcapFilter.Configurator()
+ );
+ Assert.assertEquals(results.size(), 2);
+ }
+ {
+ // Ensure that only two pcaps are returned when we look at 4 and 5
+ // test with empty query filter
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(4, pcapEntries)
+ , getTimestamp(5, pcapEntries)
+ , ""
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ , new QueryPcapFilter.Configurator()
);
Assert.assertEquals(results.size(), 2);
}
@@ -289,6 +310,22 @@ public class PcapTopologyIntegrationTest {
}}
, new Configuration()
, FileSystem.get(new Configuration())
+ , new FixedPcapFilter.Configurator()
+ );
+ Assert.assertEquals(results.size(), 0);
+ }
+ {
+ // ensure that none get returned since that destination IP address isn't in the dataset
+ // test with query filter
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(1, pcapEntries)
+ , "ip_dst_addr == '207.28.210.1'"
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ , new QueryPcapFilter.Configurator()
);
Assert.assertEquals(results.size(), 0);
}
@@ -304,6 +341,22 @@ public class PcapTopologyIntegrationTest {
}}
, new Configuration()
, FileSystem.get(new Configuration())
+ , new FixedPcapFilter.Configurator()
+ );
+ Assert.assertEquals(results.size(), 0);
+ }
+ {
+ //same with protocol as before with the destination addr
+ //test with query filter
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(1, pcapEntries)
+ , "protocol == 'foo'"
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ , new QueryPcapFilter.Configurator()
);
Assert.assertEquals(results.size(), 0);
}
@@ -317,6 +370,22 @@ public class PcapTopologyIntegrationTest {
, new EnumMap<>(Constants.Fields.class)
, new Configuration()
, FileSystem.get(new Configuration())
+ , new FixedPcapFilter.Configurator()
+ );
+ Assert.assertEquals(results.size(), pcapEntries.size());
+ }
+ {
+ //make sure I get them all.
+ //with query filter
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , ""
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ , new QueryPcapFilter.Configurator()
);
Assert.assertEquals(results.size(), pcapEntries.size());
}
@@ -331,6 +400,34 @@ public class PcapTopologyIntegrationTest {
}}
, new Configuration()
, FileSystem.get(new Configuration())
+ , new FixedPcapFilter.Configurator()
+ );
+ Assert.assertTrue(results.size() > 0);
+ Assert.assertEquals(results.size()
+ , Iterables.size(filterPcaps(pcapEntries, new Predicate<JSONObject>() {
+ @Override
+ public boolean apply(@Nullable JSONObject input) {
+ Object prt = input.get(Constants.Fields.DST_PORT.getName());
+ return prt != null && prt.toString().equals("22");
+ }
+ }, withHeaders)
+ )
+ );
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PcapMerger.merge(baos, results);
+ Assert.assertTrue(baos.toByteArray().length > 0);
+ }
+ {
+ //test with query filter
+ List<byte[]> results =
+ job.query(new Path(outDir.getAbsolutePath())
+ , new Path(queryDir.getAbsolutePath())
+ , getTimestamp(0, pcapEntries)
+ , getTimestamp(pcapEntries.size()-1, pcapEntries) + 1
+ , "ip_dst_port == '22'"
+ , new Configuration()
+ , FileSystem.get(new Configuration())
+ , new QueryPcapFilter.Configurator()
);
Assert.assertTrue(results.size() > 0);
Assert.assertEquals(results.size()
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
new file mode 100644
index 0000000..50537e1
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFieldResolver.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.query.VariableResolver;
+
+import java.util.EnumMap;
+
+public class PcapFieldResolver implements VariableResolver {
+ EnumMap<Constants.Fields, Object> fieldsMap = null;
+
+ public PcapFieldResolver(EnumMap<Constants.Fields, Object> fieldsMap) {
+ this.fieldsMap = fieldsMap;
+ }
+
+ @Override
+ public String resolve(String variable) {
+ Object obj = fieldsMap.get(Constants.Fields.fromString(variable));
+ if (obj != null) {
+ return obj.toString();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
new file mode 100644
index 0000000..c7168aa
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilter.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.pcap.PacketInfo;
+
+import java.util.Map;
+import java.util.function.Predicate;
+
+public interface PcapFilter extends Predicate<PacketInfo> {
+ void configure(Iterable<Map.Entry<String, String>> config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java
new file mode 100644
index 0000000..43e79ce
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilterConfigurator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface PcapFilterConfigurator<T> {
+ public static final String PCAP_FILTER_NAME_CONF = "PCAP_FILTER_NAME";
+ void addToConfig(T fields, Configuration conf);
+ String queryToString(T fields);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/3803df2f/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java
new file mode 100644
index 0000000..7d9e285
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/filter/PcapFilters.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.filter;
+
+import org.apache.metron.common.Creator;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+
+public enum PcapFilters implements Creator<PcapFilter> {
+ FIXED(new Creator<PcapFilter>() {
+ @Override
+ public PcapFilter create() {
+ return new FixedPcapFilter();
+ }
+ }),
+ QUERY(new Creator<PcapFilter>() {
+ @Override
+ public PcapFilter create() {
+ return new QueryPcapFilter();
+ }
+ });
+
+ Creator<PcapFilter> creator;
+
+ PcapFilters(Creator<PcapFilter> creator) {
+ this.creator = creator;
+ }
+
+ public PcapFilter create() {
+ return creator.create();
+ }
+
+}