You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by mmiklavc <gi...@git.apache.org> on 2017/02/08 00:38:35 UTC
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
GitHub user mmiklavc opened a pull request:
https://github.com/apache/incubator-metron/pull/445
METRON-706: Add Stellar transformations and filters to enrichment and threat intel loaders
This PR completes work in https://issues.apache.org/jira/browse/METRON-706
(Note: there are commits from @cestella that I had merged in the process of working on this. They are squashed in master but show up here. They only show in the commit history, not the diff)
Motivation for this PR is to expand where we expose Stellar capabilities. This work enables transformations and filtering on enrichment and threatintel extractors. The user is now able to specify transformation expressions on the column values and separately filter records based on a provided predicate. The same can also be done independently for the key indicator value used as part of the HBase key. In addition, a new property has been added to the configuration that allows a user to specify a Zookeeper quorum and reference global properties specified in the global config.
See the updated README for documentation details on the new properties.
**Testing**
Testing follows closely with the methods defined in [#432](https://github.com/apache/incubator-metron/pull/432#issuecomment-276733075)
* Download the Alexa top 1m data set
```
wget http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
unzip top-1m.csv.zip
```
* Stage import file
```
head -n 10000 top-1m.csv > top-10k.csv
head -n 10 top-1m.csv > top-10.csv
```
* Create an extractor.json for the CSV data by editing extractor.json and pasting in these contents. (Set your zk_quorum to your own value if different from the default Vagrant quick-dev environment):
```
{
"config" : {
"zk_quorum" : "node1:2181",
"columns" : {
"rank" : 0,
"domain" : 1
},
"value_transform" : {
"domain" : "DOMAIN_REMOVE_TLD(domain)",
"port" : "es.port"
},
"value_filter" : "LENGTH(domain) > 0",
"indicator_column" : "domain",
"indicator_transform" : {
"indicator" : "DOMAIN_REMOVE_TLD(indicator)"
},
"indicator_filter" : "LENGTH(indicator) > 0",
"type" : "top_domains",
"separator" : ","
},
"extractor" : "CSV"
}
```
The "port" property/variable here is referencing "es.port" from the global config.
* Run the import (parallelism of 5, batch size of 128)
```
echo "truncate 'enrichment'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10k.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128 && echo "count 'enrichment'" | hbase shell
```
You should see 9275 records in HBase. (Less than the perhaps expected 10k)
* Now run it again on the top-10 set.
```
echo "truncate 'enrichment'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128 && echo "count 'enrichment'" | hbase shell
```
You should get 9 values as below:
```
scan 'enrichment'
ROW COLUMN+CELL
\x09\x00\x0F,\x10\xE5\xD1\xDE_\xBF\x9E\xA7d\xF2\xA8\x94\x00\x0Btop_dom column=t:v, timestamp=1486513090953, value={"port":"9300","domain":"yahoo","rank":"5"}
ains\x00\x05yahoo
\x11\xCA\xCF\x01\xB4\xC5\x11@\x0C\xA1A,\xE9j~O\x00\x0Btop_domains\x00\ column=t:v, timestamp=1486513090979, value={"port":"9300","domain":"tmall","rank":"10"}
x05tmall
\x13)`\xFC\xF2\xBF\xF9\xC1a\xC8a\xF1h\x0E\xB5\x11\x00\x0Btop_domains\x column=t:v, timestamp=1486513090930, value={"port":"9300","domain":"youtube","rank":"2"}
00\x07youtube
1\xC2I\x05k\xEA\x0EY\xE1\xAD\xA0$U\xA9kc\x00\x0Btop_domains\x00\x06goo column=t:v, timestamp=1486513090964, value={"port":"9300","domain":"google","rank":"7"}
gle
=\xDD\xDFH\x95\xC0\xB9\xD9\xBAKX\x8B\x9B2T\x9F\x00\x0Btop_domains\x00\ column=t:v, timestamp=1486513090942, value={"port":"9300","domain":"facebook","rank":"3"}
x08facebook
D\xDE\x1C\x9A\xCF\x07S\x9A\xDEB\xDB\x87D\x1F\x1D\xF4\x00\x0Btop_domain column=t:v, timestamp=1486513090974, value={"port":"9300","domain":"qq","rank":"9"}
s\x00\x02qq
u\xBC\xFC\xC9\x09\x9Af\xE1\xC8\xA5\x9A\x93\xCB0c\x01\x00\x0Btop_domain column=t:v, timestamp=1486513090970, value={"port":"9300","domain":"amazon","rank":"8"}
s\x00\x06amazon
\xC7\xA5.l\xC21\xFAQ8\x1E\x5C\x99p\x93_\x9A\x00\x0Btop_domains\x00\x09 column=t:v, timestamp=1486513090958, value={"port":"9300","domain":"wikipedia","rank":"6"}
wikipedia
\xCC\xCA\xBF;\x92\xA1\xA0k\xE4\x83i\xBD\xC3\xA8\xE8p\x00\x0Btop_domain column=t:v, timestamp=1486513090948, value={"port":"9300","domain":"baidu","rank":"4"}
s\x00\x05baidu
```
Once again, we get fewer than the original dataset size. This is because multiple records are mapping to the same resulting keys in HBase.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mmiklavc/incubator-metron top-domains-merge
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-metron/pull/445.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #445
----
commit 64a2fc6ee1190776bcbb46ecf6841b58ce2bf311
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-01-25T21:38:08Z
save some work and notes
commit a6a6ab64e2777610ff57727195d3ce0d2c2c8cb1
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-01-27T14:25:54Z
Extraction done
commit 47d814ef95d67738d20ce5dc530ba7b05d418a96
Author: cstella <ce...@gmail.com>
Date: 2017-01-27T23:15:44Z
Multithreading the SimpleEnrichmentFlatFileLoader
commit 918d4ce4aea5d7dfde992f32bf049c70f35dd182
Author: cstella <ce...@gmail.com>
Date: 2017-01-27T23:23:19Z
doc changes.
commit c6ca3a86881eb77bc9598a61e3c0cf8280ccb03f
Author: cstella <ce...@gmail.com>
Date: 2017-01-27T23:39:56Z
Updating docs.
commit 8c9a79cdfa38ea2fbd161095d5e346147558ec5f
Author: cstella <ce...@gmail.com>
Date: 2017-01-28T03:36:31Z
Investigating integration tests.
commit 315bd181aa634290ab987441d81c28addb7952e2
Author: cstella <ce...@gmail.com>
Date: 2017-01-28T04:09:28Z
Update integration test to be a proper integration test.
commit 004c6f41b6c1cc3ecea70513e1a468501bd32e3c
Author: cstella <ce...@gmail.com>
Date: 2017-01-28T04:49:37Z
Adding spliterator unit test for completeness
commit f8dd48ef920c948e1fc5ff736e386f641e551b2b
Author: cstella <ce...@gmail.com>
Date: 2017-01-28T05:01:42Z
Updating test to use a proper file
commit 9b04f9723d442c8f4fb7a8bcaa1d733fc1305dc4
Author: cstella <ce...@gmail.com>
Date: 2017-01-28T05:17:12Z
Updating docs and renaming a few things.
commit eb5b82cc35bd767a169f548ea8144dd9ae165f84
Author: cstella <ce...@gmail.com>
Date: 2017-01-28T05:23:25Z
Update one more test case.
commit 81c42afa2ff619ca23bfa5ec546c94ee8d6063e5
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-01-30T16:09:52Z
partial commit - adding additional filter and transform for indicator
commit 310c98bd946b2fdb320193cce85d368f016bf8c3
Author: cstella <ce...@gmail.com>
Date: 2017-01-30T20:36:23Z
Merge branch 'master' into unified_loader
commit 3f6e3ba4f30e41c94ff25027f1fd7c839ea6c9bf
Author: cstella <ce...@gmail.com>
Date: 2017-01-31T15:39:03Z
Updating simple enrichment flat file loader to be complete.
commit 2bdaf419621704970159e75e202acfeb868c3571
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-01-31T20:16:10Z
Merge branch 'master' into top-domains
commit 79cfdb4fba5e82e9e170bfc77c7133e6646f9787
Author: cstella <ce...@gmail.com>
Date: 2017-01-31T22:12:05Z
Removing old threatintel_bulk_load.sh script and integrating into the flatfile load script
commit bf7756b52e66907ca23a576ba9be9ab40b33f77d
Author: cstella <ce...@gmail.com>
Date: 2017-01-31T22:22:17Z
Forgot licenses.
commit e5729a296bdbef6d2d3ee87c69aade396708f47d
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-01T00:16:06Z
Merge with master. Get indicator transforms and filter working
commit a104f464e6b882121c7ab44079a5570d282c8457
Author: cstella <ce...@gmail.com>
Date: 2017-02-01T00:28:46Z
updating script.
commit b121e13d892834865847ddd806cbf10da63fa44e
Author: cstella <ce...@gmail.com>
Date: 2017-02-01T00:34:28Z
Merge branch 'master' into unified_loader
commit b5a9e5a9243576b27d59e959dfab3e99d34eb761
Author: cstella <ce...@gmail.com>
Date: 2017-02-01T00:57:02Z
Added gzip and zip to regular files
commit 323267ddfb52ab1aa7488e02643a8158044797e2
Author: cstella <ce...@gmail.com>
Date: 2017-02-01T15:04:53Z
Fixed stupid zip issue.
commit bc26b5b3992b91097bb4fc4b214d4b6bacaddfbb
Author: cstella <ce...@gmail.com>
Date: 2017-02-01T16:27:58Z
Updating readme and making progress bar optional and better.
commit 6cdf35d94f72be7da524fd5f854876f131ddb9f9
Author: cstella <ce...@gmail.com>
Date: 2017-02-01T17:39:59Z
updating tests to include gzip and zip
commit fd718bffa5e97f2c5c510b38d6a6d3812aefbed9
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-01T18:57:04Z
Refactor
commit d24f0c974d27e3861cb431c48efb3380a372e58b
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-02T19:03:56Z
Get unit test for extractor decorator working
commit d9bb54ec27a0f3282d28ba40d043f0045c167a54
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-02T21:47:08Z
Add negative test cases. Refactor options as enum in extractor decorator
commit 43c09c810c7d7cfa05cffa4609edab7ba2f24492
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-03T18:10:34Z
Intermediate commit - need to fetch from PR432
commit eafc786250d9b8e6283bd71c91bbd270ba4d1311
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-03T18:52:03Z
Get integration tests for flat file loader working with my branch. Fix trampled commit for ExtractorHandler
commit ad1aef760948109565b7144479151312ebccc24d
Author: Michael Miklavcic <mi...@gmail.com>
Date: 2017-02-03T19:46:05Z
Get integration tests working for Stellar transformations in the file loader
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100131091
--- Diff: metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java ---
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.MapVariableResolver;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarPredicateProcessor;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
+
+public class TransformFilterExtractorDecorator extends ExtractorDecorator {
+ private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
+
+ protected enum ExtractorOptions {
+ VALUE_TRANSFORM("value_transform"),
+ VALUE_FILTER("value_filter"),
+ INDICATOR_TRANSFORM("indicator_transform"),
+ INDICATOR_FILTER("indicator_filter"),
+ ZK_QUORUM("zk_quorum"),
+ INDICATOR("indicator");
+
+ private String key;
+
+ ExtractorOptions(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return key;
+ }
+
+ public boolean existsIn(Map<String, Object> config) {
+ return config.containsKey(key);
+ }
+ }
+
+ private Optional<CuratorFramework> zkClient;
+ private Map<String, String> valueTransforms;
+ private Map<String, String> indicatorTransforms;
+ private String valueFilter;
+ private String indicatorFilter;
+ private Context stellarContext;
+ private StellarProcessor transformProcessor;
+ private StellarPredicateProcessor filterProcessor;
+ private Map<String, Object> globalConfig;
+
+ public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
+ super(decoratedExtractor);
+ this.zkClient = Optional.empty();
+ this.valueTransforms = new LinkedHashMap<>();
+ this.indicatorTransforms = new LinkedHashMap<>();
+ this.valueFilter = "";
+ this.indicatorFilter = "";
+ this.transformProcessor = new StellarProcessor();
+ this.filterProcessor = new StellarPredicateProcessor();
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ super.initialize(config);
+ if (VALUE_TRANSFORM.existsIn(config)) {
+ this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
+ }
+ if (INDICATOR_TRANSFORM.existsIn(config)) {
+ this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
+ }
+ if (VALUE_FILTER.existsIn(config)) {
+ this.valueFilter = getFilter(config, VALUE_FILTER.toString());
+ }
+ if (INDICATOR_FILTER.existsIn(config)) {
+ this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
+ }
+ String zkClientUrl = "";
+ if (ZK_QUORUM.existsIn(config)) {
+ zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
+ }
+ zkClient = setupClient(zkClient, zkClientUrl);
+ this.globalConfig = getGlobalConfig(zkClient);
+ this.stellarContext = createContext(zkClient);
+ StellarFunctions.initialize(stellarContext);
+ this.transformProcessor = new StellarProcessor();
+ this.filterProcessor = new StellarPredicateProcessor();
+ }
+
+ private String getFilter(Map<String, Object> config, String valueFilter) {
+ return ConversionUtils.convertOrFail(config.get(valueFilter), String.class);
+ }
+
+ /**
+ * Get a map of the transformations from the config of the specified type
+ * @param config main config map
+ * @param type the transformation type to get from config
+ * @return map of transformations.
+ */
+ private Map<String, String> getTransforms(Map<String, Object> config, String type) {
+ Map<Object, Object> transformsConfig = ConversionUtils.convertOrFail(config.get(type), Map.class);
+ Map<String, String> transforms = new LinkedHashMap<>();
+ for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
+ String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
+ String val = ConversionUtils.convertOrFail(e.getValue(), String.class);
+ transforms.put(key, val);
+ }
+ return transforms;
+ }
+
+ /**
+ * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
+ * @param zookeeperUrl The Zookeeper URL.
+ */
+ private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
+ // can only create client if we have a valid zookeeper URL
+ if (!zkClient.isPresent()) {
+ if (StringUtils.isNotBlank(zookeeperUrl)) {
+ CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+ client.start();
+ return Optional.of(client);
+ } else {
+ LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
+ return Optional.empty();
+ }
+ } else {
+ return zkClient;
+ }
+ }
+
+ private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
+ if (zkClient.isPresent()) {
+ try {
+ return JSONUtils.INSTANCE.load(
+ new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
+ new TypeReference<Map<String, Object>>() {
+ });
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
+ }
+ }
+ return new LinkedHashMap<>();
+ }
+
+ private Context createContext(Optional<CuratorFramework> zkClient) {
+ Context.Builder builder = new Context.Builder();
+ if (zkClient.isPresent()) {
+ builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
--- End diff --
I mean `{}`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron issue #445: METRON-706: Add Stellar transformations and fil...
Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on the issue:
https://github.com/apache/incubator-metron/pull/445
Note: Per the recent issue in master with Ansible, I tested the following as well
* Create threat_ip.csv
```
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#Add single column of ip address to alert
#Public lists are available on the internet
# example:
23.113.113.105
24.107.205.249
24.108.62.255
24.224.153.71
27.4.1.212
27.131.149.102
31.24.30.31
31.131.251.33
31.186.99.250
31.192.209.119
31.192.209.150
31.200.244.17
37.34.52.185
37.58.112.101
37.99.146.27
37.128.132.96
37.140.195.177
37.140.199.100
```
* Uploaded threat_ip.csv to HDFS:
```
hdfs dfs -put -f threat_ip.csv
```
* Create extractor.json
```
{
"config": {
"columns": {
"ip": 0
},
"indicator_column": "ip",
"type" : "malicious_ip",
"separator": ","
},
"extractor": "CSV"
}
```
* Run as root user from /root
```
echo "truncate 'threatintel'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -c t -t threatintel -e extractor.json -i /user/root -m MR
```
* Verify the records are there
```
echo "scan 'threatintel'" | hbase shell
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100072226
--- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java ---
@@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
}
};
+ public static <T> T convertOrFail(Object o, Class<T> clazz) {
+ if (clazz.isInstance(o)) {
--- End diff --
Any reason why this isn't just `clazz.cast(o)` and called `cast`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron issue #445: METRON-706: Add Stellar transformations and fil...
Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on the issue:
https://github.com/apache/incubator-metron/pull/445
Adding these timing notes about the import for reference:
**No filter, local load, multiple threads (5), batch 128**
real 10m22.127s
user 11m11.873s
sys 4m23.912s
903392 rows
**With filters, multiple threads (5), batch 128 (1 record less)**
real 10m58.210s
user 11m10.592s
sys 4m16.585s
903391 rows
**MapReduce mode**
real 9m20.566s
user 0m26.853s
sys 0m10.334s
903391 rows
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100129984
--- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java ---
@@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
}
};
+ public static <T> T convertOrFail(Object o, Class<T> clazz) {
+ if (clazz.isInstance(o)) {
--- End diff --
Good point. I'm removing this entirely. I'll just use casting in place, e.g.
**Example 1**
`String val = (String) map.get("foo"); // throws class cast exception on failure, which is what we want`
**Example 2**
```
Map<Object, Object> a = new HashMap() {{
put("hello", "world");
put(1, 2);
}};
Map<String, Object> b = new HashMap() {{
put("a", a);
}};
Map<Object, Object> c = (Map) b.get("a"); // throws class cast exception if not a Map
Map<String, String> d = new HashMap<>();
for (Map.Entry<Object, Object> entry : c.entrySet()) {
d.put((String) entry.getKey(), (String) entry.getValue()); // throws class cast exception. also what we want
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron issue #445: METRON-706: Add Stellar transformations and fil...
Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:
https://github.com/apache/incubator-metron/pull/445
+1 by inspection
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100087723
--- Diff: metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java ---
@@ -200,6 +295,7 @@ public static void teardown() throws Exception {
multilineZipFile.delete();
lineByLineExtractorConfigFile.delete();
wholeFileExtractorConfigFile.delete();
+ stellarExtractorConfigFile.delete();
--- End diff --
Good catch, fixing
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100130952
--- Diff: metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java ---
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.MapVariableResolver;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarPredicateProcessor;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
+
+public class TransformFilterExtractorDecorator extends ExtractorDecorator {
+ private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
+
+ protected enum ExtractorOptions {
+ VALUE_TRANSFORM("value_transform"),
+ VALUE_FILTER("value_filter"),
+ INDICATOR_TRANSFORM("indicator_transform"),
+ INDICATOR_FILTER("indicator_filter"),
+ ZK_QUORUM("zk_quorum"),
+ INDICATOR("indicator");
+
+ private String key;
+
+ ExtractorOptions(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return key;
+ }
+
+ public boolean existsIn(Map<String, Object> config) {
+ return config.containsKey(key);
+ }
+ }
+
+ private Optional<CuratorFramework> zkClient;
+ private Map<String, String> valueTransforms;
+ private Map<String, String> indicatorTransforms;
+ private String valueFilter;
+ private String indicatorFilter;
+ private Context stellarContext;
+ private StellarProcessor transformProcessor;
+ private StellarPredicateProcessor filterProcessor;
+ private Map<String, Object> globalConfig;
+
+ public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
+ super(decoratedExtractor);
+ this.zkClient = Optional.empty();
+ this.valueTransforms = new LinkedHashMap<>();
+ this.indicatorTransforms = new LinkedHashMap<>();
+ this.valueFilter = "";
+ this.indicatorFilter = "";
+ this.transformProcessor = new StellarProcessor();
+ this.filterProcessor = new StellarPredicateProcessor();
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ super.initialize(config);
+ if (VALUE_TRANSFORM.existsIn(config)) {
+ this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
+ }
+ if (INDICATOR_TRANSFORM.existsIn(config)) {
+ this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
+ }
+ if (VALUE_FILTER.existsIn(config)) {
+ this.valueFilter = getFilter(config, VALUE_FILTER.toString());
+ }
+ if (INDICATOR_FILTER.existsIn(config)) {
+ this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
+ }
+ String zkClientUrl = "";
+ if (ZK_QUORUM.existsIn(config)) {
+ zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
+ }
+ zkClient = setupClient(zkClient, zkClientUrl);
+ this.globalConfig = getGlobalConfig(zkClient);
+ this.stellarContext = createContext(zkClient);
+ StellarFunctions.initialize(stellarContext);
+ this.transformProcessor = new StellarProcessor();
+ this.filterProcessor = new StellarPredicateProcessor();
+ }
+
+ private String getFilter(Map<String, Object> config, String valueFilter) {
+ return ConversionUtils.convertOrFail(config.get(valueFilter), String.class);
+ }
+
+ /**
+ * Get a map of the transformations from the config of the specified type
+ * @param config main config map
+ * @param type the transformation type to get from config
+ * @return map of transformations.
+ */
+ private Map<String, String> getTransforms(Map<String, Object> config, String type) {
+ Map<Object, Object> transformsConfig = ConversionUtils.convertOrFail(config.get(type), Map.class);
+ Map<String, String> transforms = new LinkedHashMap<>();
+ for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
+ String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
+ String val = ConversionUtils.convertOrFail(e.getValue(), String.class);
+ transforms.put(key, val);
+ }
+ return transforms;
+ }
+
+ /**
+ * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
+ * @param zookeeperUrl The Zookeeper URL.
+ */
+ private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
+ // can only create client if we have a valid zookeeper URL
+ if (!zkClient.isPresent()) {
+ if (StringUtils.isNotBlank(zookeeperUrl)) {
+ CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+ client.start();
+ return Optional.of(client);
+ } else {
+ LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
+ return Optional.empty();
+ }
+ } else {
+ return zkClient;
+ }
+ }
+
+ private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
+ if (zkClient.isPresent()) {
+ try {
+ return JSONUtils.INSTANCE.load(
+ new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
+ new TypeReference<Map<String, Object>>() {
+ });
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
+ }
+ }
+ return new LinkedHashMap<>();
+ }
+
+ private Context createContext(Optional<CuratorFramework> zkClient) {
+ Context.Builder builder = new Context.Builder();
+ if (zkClient.isPresent()) {
+ builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
--- End diff --
By empty, do you mean null or "{}"? Does Stellar handle that differently from choosing not to add the capability at all?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/incubator-metron/pull/445
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100074335
--- Diff: metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java ---
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.extractor;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.log4j.Logger;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.MapVariableResolver;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.stellar.StellarPredicateProcessor;
+import org.apache.metron.common.stellar.StellarProcessor;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.enrichment.lookup.LookupKV;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
+
+public class TransformFilterExtractorDecorator extends ExtractorDecorator {
+ private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
+
+ protected enum ExtractorOptions {
+ VALUE_TRANSFORM("value_transform"),
+ VALUE_FILTER("value_filter"),
+ INDICATOR_TRANSFORM("indicator_transform"),
+ INDICATOR_FILTER("indicator_filter"),
+ ZK_QUORUM("zk_quorum"),
+ INDICATOR("indicator");
+
+ private String key;
+
+ ExtractorOptions(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String toString() {
+ return key;
+ }
+
+ public boolean existsIn(Map<String, Object> config) {
+ return config.containsKey(key);
+ }
+ }
+
+ private Optional<CuratorFramework> zkClient;
+ private Map<String, String> valueTransforms;
+ private Map<String, String> indicatorTransforms;
+ private String valueFilter;
+ private String indicatorFilter;
+ private Context stellarContext;
+ private StellarProcessor transformProcessor;
+ private StellarPredicateProcessor filterProcessor;
+ private Map<String, Object> globalConfig;
+
+ public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
+ super(decoratedExtractor);
+ this.zkClient = Optional.empty();
+ this.valueTransforms = new LinkedHashMap<>();
+ this.indicatorTransforms = new LinkedHashMap<>();
+ this.valueFilter = "";
+ this.indicatorFilter = "";
+ this.transformProcessor = new StellarProcessor();
+ this.filterProcessor = new StellarPredicateProcessor();
+ }
+
+ @Override
+ public void initialize(Map<String, Object> config) {
+ super.initialize(config);
+ if (VALUE_TRANSFORM.existsIn(config)) {
+ this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
+ }
+ if (INDICATOR_TRANSFORM.existsIn(config)) {
+ this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
+ }
+ if (VALUE_FILTER.existsIn(config)) {
+ this.valueFilter = getFilter(config, VALUE_FILTER.toString());
+ }
+ if (INDICATOR_FILTER.existsIn(config)) {
+ this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
+ }
+ String zkClientUrl = "";
+ if (ZK_QUORUM.existsIn(config)) {
+ zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
+ }
+ zkClient = setupClient(zkClient, zkClientUrl);
+ this.globalConfig = getGlobalConfig(zkClient);
+ this.stellarContext = createContext(zkClient);
+ StellarFunctions.initialize(stellarContext);
+ this.transformProcessor = new StellarProcessor();
+ this.filterProcessor = new StellarPredicateProcessor();
+ }
+
+ private String getFilter(Map<String, Object> config, String valueFilter) {
+ return ConversionUtils.convertOrFail(config.get(valueFilter), String.class);
+ }
+
+ /**
+ * Get a map of the transformations from the config of the specified type
+ * @param config main config map
+ * @param type the transformation type to get from config
+ * @return map of transformations.
+ */
+ private Map<String, String> getTransforms(Map<String, Object> config, String type) {
+ Map<Object, Object> transformsConfig = ConversionUtils.convertOrFail(config.get(type), Map.class);
+ Map<String, String> transforms = new LinkedHashMap<>();
+ for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
+ String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
+ String val = ConversionUtils.convertOrFail(e.getValue(), String.class);
+ transforms.put(key, val);
+ }
+ return transforms;
+ }
+
+ /**
+ * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
+ * @param zookeeperUrl The Zookeeper URL.
+ */
+ private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
+ // can only create client if we have a valid zookeeper URL
+ if (!zkClient.isPresent()) {
+ if (StringUtils.isNotBlank(zookeeperUrl)) {
+ CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
+ client.start();
+ return Optional.of(client);
+ } else {
+ LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
+ return Optional.empty();
+ }
+ } else {
+ return zkClient;
+ }
+ }
+
+ private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
+ if (zkClient.isPresent()) {
+ try {
+ return JSONUtils.INSTANCE.load(
+ new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
+ new TypeReference<Map<String, Object>>() {
+ });
+ } catch (Exception e) {
+ LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
+ }
+ }
+ return new LinkedHashMap<>();
+ }
+
+ private Context createContext(Optional<CuratorFramework> zkClient) {
+ Context.Builder builder = new Context.Builder();
+ if (zkClient.isPresent()) {
+ builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
--- End diff --
You might want an empty global config even if the `zkClient` isn't present. Not sure, just a thought.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100131179
--- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java ---
@@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
}
};
+ public static <T> T convertOrFail(Object o, Class<T> clazz) {
+ if (clazz.isInstance(o)) {
--- End diff --
Agreed
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...
Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:
https://github.com/apache/incubator-metron/pull/445#discussion_r100075392
--- Diff: metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java ---
@@ -200,6 +295,7 @@ public static void teardown() throws Exception {
multilineZipFile.delete();
lineByLineExtractorConfigFile.delete();
wholeFileExtractorConfigFile.delete();
+ stellarExtractorConfigFile.delete();
--- End diff --
Did you forget `customLineByLineExtractorConfigFile.delete();`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---