You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/05/29 15:00:20 UTC
incubator-metron git commit: METRON-178 Expose the filter capability
in the Parser topologies. closes apache/incubator-metron#129
Repository: incubator-metron
Updated Branches:
refs/heads/master ed96afe08 -> ddc6c07c6
METRON-178 Expose the filter capability in the Parser topologies. closes apache/incubator-metron#129
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ddc6c07c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ddc6c07c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ddc6c07c
Branch: refs/heads/master
Commit: ddc6c07c659d7fc49d1d832bbe46c44624ee9eee
Parents: ed96afe
Author: cstella <ce...@gmail.com>
Authored: Sun May 29 11:00:12 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Sun May 29 11:00:12 2016 -0400
----------------------------------------------------------------------
.../common/bolt/ConfiguredParserBolt.java | 13 +++-
.../configuration/SensorParserConfig.java | 13 ++++
.../metron/common/utils/ReflectionUtils.java | 11 ++-
.../common/bolt/ConfiguredParserBoltTest.java | 2 +-
.../apache/metron/parsers/bolt/ParserBolt.java | 27 ++++---
.../parsers/filters/AbstractMessageFilter.java | 30 ++++++++
.../parsers/filters/BroMessageFilter.java | 77 +++++++++++---------
.../apache/metron/parsers/filters/Filters.java | 58 +++++++++++++++
.../parsers/filters/GenericMessageFilter.java | 11 ++-
.../metron/parsers/filters/QueryFilter.java | 57 +++++++++++++++
.../metron/parsers/interfaces/Configurable.java | 26 +++++++
.../parsers/interfaces/MessageFilter.java | 6 +-
.../parsers/interfaces/MessageParser.java | 4 +-
.../org/apache/metron/filters/FiltersTest.java | 57 +++++++++++++++
.../metron/parsers/bolt/ParserBoltTest.java | 47 ++++++++++++
15 files changed, 385 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
index 543f87b..feab40e 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java
@@ -22,6 +22,7 @@ import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
import java.io.IOException;
@@ -30,15 +31,23 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt {
private static final Logger LOG = Logger.getLogger(ConfiguredEnrichmentBolt.class);
protected final ParserConfigurations configurations = new ParserConfigurations();
-
- public ConfiguredParserBolt(String zookeeperUrl) {
+ private String sensorType;
+ public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
super(zookeeperUrl);
+ this.sensorType = sensorType;
+ }
+
+ protected SensorParserConfig getSensorParserConfig() {
+ return getConfigurations().getSensorParserConfig(sensorType);
}
public ParserConfigurations getConfigurations() {
return configurations;
}
+ public String getSensorType() {
+ return sensorType;
+ }
@Override
public void loadConfig() {
try {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 106c55a..5d1bda9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
public class SensorParserConfig {
private String parserClassName;
+ private String filterClassName;
private String sensorTopic;
private Map<String, Object> parserConfig = new HashMap<>();
private List<FieldTransformer> fieldTransformations = new ArrayList<>();
@@ -41,6 +42,14 @@ public class SensorParserConfig {
this.fieldTransformations = fieldTransformations;
}
+ public String getFilterClassName() {
+ return filterClassName;
+ }
+
+ public void setFilterClassName(String filterClassName) {
+ this.filterClassName = filterClassName;
+ }
+
public String getParserClassName() {
return parserClassName;
}
@@ -86,6 +95,7 @@ public class SensorParserConfig {
public String toString() {
return "SensorParserConfig{" +
"parserClassName='" + parserClassName + '\'' +
+ ", filterClassName='" + filterClassName + '\'' +
", sensorTopic='" + sensorTopic + '\'' +
", parserConfig=" + parserConfig +
", fieldTransformations=" + fieldTransformations +
@@ -101,6 +111,8 @@ public class SensorParserConfig {
if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null)
return false;
+ if (getFilterClassName() != null ? !getFilterClassName().equals(that.getFilterClassName()) : that.getFilterClassName() != null)
+ return false;
if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
return false;
if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
@@ -112,6 +124,7 @@ public class SensorParserConfig {
@Override
public int hashCode() {
int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
+ result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
index 141221d..dadcb1b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -32,10 +32,9 @@ public class ReflectionUtils<T> {
return instance;
}
- public static <T> T createInstance(String className) {
+ public static <T> T createInstance(Class<? extends T> clazz) {
T instance;
try {
- Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
instance = clazz.getConstructor().newInstance();
} catch (InstantiationException e) {
throw new IllegalStateException("Unable to instantiate connector.", e);
@@ -45,6 +44,14 @@ public class ReflectionUtils<T> {
throw new IllegalStateException("Unable to instantiate connector", e);
} catch (NoSuchMethodException e) {
throw new IllegalStateException("Unable to instantiate connector: no such method", e);
+ }
+ return instance;
+ }
+ public static <T> T createInstance(String className) {
+ T instance;
+ try {
+ Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+ instance = createInstance(clazz);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Unable to instantiate connector: class not found", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
index f945dd0..3010ed8 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/bolt/ConfiguredParserBoltTest.java
@@ -43,7 +43,7 @@ public class ConfiguredParserBoltTest extends BaseConfiguredBoltTest {
public static class StandAloneConfiguredParserBolt extends ConfiguredParserBolt {
public StandAloneConfiguredParserBolt(String zookeeperUrl) {
- super(zookeeperUrl);
+ super(zookeeperUrl, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 2a64666..addeaab 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -23,6 +23,8 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.metron.common.Constants;
import org.apache.metron.common.bolt.ConfiguredParserBolt;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.filters.Filters;
import org.apache.metron.common.configuration.FieldTransformer;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.parsers.filters.GenericMessageFilter;
@@ -39,14 +41,12 @@ public class ParserBolt extends ConfiguredParserBolt {
private OutputCollector collector;
private MessageParser<JSONObject> parser;
- private MessageFilter<JSONObject> filter = new GenericMessageFilter();
+ private MessageFilter<JSONObject> filter;
private MessageWriter<JSONObject> writer;
- private String sensorType;
public ParserBolt(String zookeeperUrl, String sensorType, MessageParser<JSONObject> parser, MessageWriter<JSONObject> writer) {
- super(zookeeperUrl);
+ super(zookeeperUrl, sensorType);
this.parser = parser;
- this.sensorType = sensorType;
this.writer = writer;
}
@@ -60,35 +60,44 @@ public class ParserBolt extends ConfiguredParserBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
this.collector = collector;
+ if(getSensorParserConfig() == null) {
+ filter = new GenericMessageFilter();
+ }
+ else if(filter == null) {
+ filter = Filters.get(getSensorParserConfig().getFilterClassName()
+ , getSensorParserConfig().getParserConfig()
+ );
+ }
parser.init();
writer.init();
- SensorParserConfig config = getConfigurations().getSensorParserConfig(sensorType);
+ SensorParserConfig config = getSensorParserConfig();
if(config != null) {
config.init();
}
else {
- throw new IllegalStateException("Unable to retrieve a parser config for " + sensorType);
+ throw new IllegalStateException("Unable to retrieve a parser config for " + getSensorType());
}
}
+
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
byte[] originalMessage = tuple.getBinary(0);
- SensorParserConfig sensorParserConfig = getConfigurations().getSensorParserConfig(sensorType);
+ SensorParserConfig sensorParserConfig = getSensorParserConfig();
try {
if(sensorParserConfig != null) {
List<JSONObject> messages = parser.parse(originalMessage);
for (JSONObject message : messages) {
if (parser.validate(message)) {
if (filter != null && filter.emitTuple(message)) {
- message.put(Constants.SENSOR_TYPE, sensorType);
+ message.put(Constants.SENSOR_TYPE, getSensorType());
for (FieldTransformer handler : sensorParserConfig.getFieldTransformations()) {
if (handler != null) {
handler.transformAndUpdate(message, sensorParserConfig.getParserConfig());
}
}
- writer.write(sensorType, configurations, tuple, message);
+ writer.write(getSensorType(), configurations, tuple, message);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java
new file mode 100644
index 0000000..8fa2af9
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/AbstractMessageFilter.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public abstract class AbstractMessageFilter implements MessageFilter<JSONObject>, Serializable{
+ public AbstractMessageFilter(Map<String, Object> config) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
index d026d08..2145dfe 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/BroMessageFilter.java
@@ -24,39 +24,50 @@ import org.json.simple.JSONObject;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-public class BroMessageFilter implements MessageFilter<JSONObject>,
- Serializable {
-
- /**
- * Filter protocols based on whitelists and blacklists
- */
-
- private static final long serialVersionUID = -3824683649114625033L;
- private String _key;
- private final Set<String> _known_protocols;
-
- /**
- * @param conf Commons configuration for reading properties files
- * @param key Key in a JSON mesage where the protocol field is located
- */
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public BroMessageFilter(Configuration conf, String key) {
- _key = key;
- _known_protocols = new HashSet<>();
- List known_protocols = conf.getList("source.known.protocols");
- _known_protocols.addAll(known_protocols);
- }
-
- /**
- * @param message JSON representation of a message with a protocol field
- * @return False if message if filtered and True if message is not filtered
- */
-
- public boolean emitTuple(JSONObject message) {
- String protocol = (String) message.get(_key);
- return _known_protocols.contains(protocol);
- }
+public class BroMessageFilter implements MessageFilter<JSONObject>{
+
+ /**
+ * Filter protocols based on whitelists and blacklists
+ */
+
+ private static final long serialVersionUID = -3824683649114625033L;
+ private String _key;
+ private final Set<String> _known_protocols;
+
+ public BroMessageFilter() {
+ _known_protocols = new HashSet<>();
+ }
+
+ public void configure(Map<String, Object> config) {
+ Object protocolsObj = config.get("bro.filter.source.known.protocols");
+ Object keyObj = config.get("bro.filter.source.key");
+ if(keyObj != null) {
+ _key = keyObj.toString();
+ }
+ if(protocolsObj != null) {
+ if(protocolsObj instanceof String) {
+ _known_protocols.clear();
+ _known_protocols.add(protocolsObj.toString());
+ }
+ else if(protocolsObj instanceof List) {
+ _known_protocols.clear();
+ for(Object o : (List)protocolsObj) {
+ _known_protocols.add(o.toString());
+ }
+ }
+ }
+ }
+
+ /**
+ * @param message JSON representation of a message with a protocol field
+ * @return False if message if filtered and True if message is not filtered
+ */
+
+ public boolean emitTuple(JSONObject message) {
+ String protocol = (String) message.get(_key);
+ return _known_protocols.contains(protocol);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
new file mode 100644
index 0000000..fc9a49e
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import org.apache.metron.common.utils.ReflectionUtils;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+public enum Filters {
+ BRO(BroMessageFilter.class)
+ ,QUERY(QueryFilter.class)
+ ,DEFAULT(GenericMessageFilter.class)
+ ;
+ Class<? extends MessageFilter> clazz;
+ Filters(Class<? extends MessageFilter> clazz) {
+ this.clazz = clazz;
+ }
+ public static MessageFilter<JSONObject> get(String filterName, Map<String, Object> config) {
+ if(filterName == null || filterName.trim().isEmpty()) {
+ return new GenericMessageFilter();
+ }
+ Class<? extends MessageFilter> filterClass;
+ try {
+ Filters f = Filters.valueOf(filterName);
+ filterClass = f.clazz;
+ }
+ catch(Exception ex) {
+ try {
+ filterClass = (Class<? extends MessageFilter>) Class.forName(filterName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Unable to find class " + filterName, e);
+ }
+ }
+ MessageFilter<JSONObject> filter = ReflectionUtils.createInstance(filterClass);
+ filter.configure(config);
+ return filter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
index 9defe32..e4524ae 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
@@ -21,14 +21,21 @@ import org.apache.metron.parsers.interfaces.MessageFilter;
import org.json.simple.JSONObject;
import java.io.Serializable;
+import java.util.Map;
-public class GenericMessageFilter implements MessageFilter<JSONObject>,
- Serializable {
+public class GenericMessageFilter implements MessageFilter<JSONObject>{
private static final long serialVersionUID = 3626397212398318852L;
+ public GenericMessageFilter() {
+ }
+
public boolean emitTuple(JSONObject message) {
return true;
}
+ @Override
+ public void configure(Map<String, Object> config) {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
new file mode 100644
index 0000000..ee64fb4
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.common.query.MapVariableResolver;
+import org.apache.metron.common.query.PredicateProcessor;
+import org.apache.metron.common.query.VariableResolver;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class QueryFilter implements MessageFilter<JSONObject> {
+ public static final String QUERY_STRING_CONF = "filter.query";
+ private PredicateProcessor processor = new PredicateProcessor();
+ private String query;
+
+ public QueryFilter()
+ {
+
+ }
+
+ @Override
+ public void configure(Map<String, Object> config) {
+ Object o = config.get(QUERY_STRING_CONF);
+ if(o instanceof String) {
+ query= o.toString();
+ }
+
+ processor.validate(query, true);
+ }
+
+ @Override
+ public boolean emitTuple(JSONObject message) {
+ VariableResolver resolver = new MapVariableResolver(message);
+ return processor.parse(query, resolver);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java
new file mode 100644
index 0000000..4a707e6
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/Configurable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.interfaces;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface Configurable extends Serializable {
+ void configure(Map<String, Object> config);
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
index 2e5ab29..10a899b 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageFilter.java
@@ -17,8 +17,10 @@
*/
package org.apache.metron.parsers.interfaces;
-public interface MessageFilter<T> {
+import java.io.Serializable;
+import java.util.Map;
- boolean emitTuple(T message);
+public interface MessageFilter<T> extends Configurable{
+ boolean emitTuple(T message);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
index 81d1b1a..439f06d 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/interfaces/MessageParser.java
@@ -21,9 +21,7 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
-public interface MessageParser<T> extends Serializable {
-
- void configure(Map<String, Object> parserConfig);
+public interface MessageParser<T> extends Configurable{
void init();
List<T> parse(byte[] rawMessage);
boolean validate(T message);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
new file mode 100644
index 0000000..9df0101
--- /dev/null
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.filters;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.logging.log4j.core.filter.AbstractFilter;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.filters.AbstractMessageFilter;
+import org.apache.metron.parsers.filters.Filters;
+import org.apache.metron.parsers.filters.GenericMessageFilter;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FiltersTest {
+ @Test
+ public void testDefault() {
+ Assert.assertTrue(Filters.get("DEFAULT", null).emitTuple(null));
+ Assert.assertTrue(Filters.get(GenericMessageFilter.class.getName(), null).emitTuple(null));
+ }
+
+ @Test
+ public void testSingleQueryFilter() throws Exception {
+ {
+ Map<String, Object> config = new HashMap<String, Object>() {{
+ put("filter.query", "exists(foo)");
+ }};
+ MessageFilter<JSONObject> filter = Filters.get("QUERY", config);
+ Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1))));
+ Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1))));
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ddc6c07c/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index b90e521..658d52e 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.metron.parsers.bolt;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.test.bolt.BaseBoltTest;
@@ -29,6 +31,7 @@ import org.json.simple.parser.JSONParser;
import org.junit.Test;
import org.mockito.Mock;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -100,4 +103,48 @@ public class ParserBoltTest extends BaseBoltTest {
verify(outputCollector, times(1)).reportError(any(Throwable.class));
}
+ /**
+ {
+ "filterClassName" : "QUERY"
+ ,"parserConfig" : {
+ "filter.query" : "exists(field1)"
+ }
+ }
+ */
+ @Multiline
+ public static String sensorParserConfig;
+ @Test
+ public void testFilter() throws Exception {
+ String sensorType = "yaf";
+ ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, writer) {
+ @Override
+ protected SensorParserConfig getSensorParserConfig() {
+ try {
+ return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ parserBolt.setCuratorFramework(client);
+ parserBolt.setTreeCache(cache);
+ parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+ verify(parser, times(1)).init();
+ verify(writer, times(1)).init();
+ byte[] sampleBinary = "some binary message".getBytes();
+ JSONParser jsonParser = new JSONParser();
+ final JSONObject sampleMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\" }");
+ final JSONObject sampleMessage2 = (JSONObject) jsonParser.parse("{ \"field2\":\"value2\" }");
+ List<JSONObject> messages = new ArrayList<JSONObject>() {{
+ add(sampleMessage1);
+ add(sampleMessage2);
+ }};
+ final JSONObject finalMessage1 = (JSONObject) jsonParser.parse("{ \"field1\":\"value1\", \"source.type\":\"" + sensorType + "\" }");
+ when(tuple.getBinary(0)).thenReturn(sampleBinary);
+ when(parser.parse(sampleBinary)).thenReturn(messages);
+ when(parser.validate(any(JSONObject.class))).thenReturn(true);
+ parserBolt.execute(tuple);
+ verify(writer, times(1)).write(eq(sensorType), any(Configurations.class), eq(tuple), eq(finalMessage1));
+ verify(outputCollector, times(1)).ack(tuple);
+ }
}