You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by mmiklavc <gi...@git.apache.org> on 2017/02/08 00:38:35 UTC

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

GitHub user mmiklavc opened a pull request:

    https://github.com/apache/incubator-metron/pull/445

    METRON-706: Add Stellar transformations and filters to enrichment and threat intel loaders

    This PR completes work in https://issues.apache.org/jira/browse/METRON-706
    
    (Note: there are commits from @cestella that I had merged in the process of working on this. They are squashed in master but show up here. They only show in the commit history, not the diff)
    
    Motivation for this PR is to expand where we expose Stellar capabilities. This work enables transformations and filtering on enrichment and threatintel extractors. The user is now able to specify transformation expressions on the column values and separately filter records based on a provided predicate. The same can also be done independently for the key indicator value used as part of the HBase key. In addition, a new property has been added to the configuration that allows a user to specify a Zookeeper quorum and reference global properties specified in the global config.
    
    See the updated README for documentation details on the new properties.
    
    **Testing**
    
    Testing follows closely with the methods defined in [#432](https://github.com/apache/incubator-metron/pull/432#issuecomment-276733075)
    
    * Download the Alexa top 1m data set
    ```
    wget http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
    unzip top-1m.csv.zip
    ```
    
    * Stage import file
    ```
    head -n 10000 top-1m.csv > top-10k.csv
    head -n 10 top-1m.csv > top-10.csv
    ```
    
    * Create an extractor.json for the CSV data by editing extractor.json and pasting in these contents. (Set your zk_quorum to your own value if different from the default Vagrant quick-dev environment):
    ```
    {
      "config" : {
        "zk_quorum" : "node1:2181",
        "columns" : {
           "rank" : 0,
           "domain" : 1
        },
        "value_transform" : {
           "domain" : "DOMAIN_REMOVE_TLD(domain)",
           "port" : "es.port"
        },
        "value_filter" : "LENGTH(domain) > 0",
        "indicator_column" : "domain",
        "indicator_transform" : {
           "indicator" : "DOMAIN_REMOVE_TLD(indicator)"
        },
        "indicator_filter" : "LENGTH(indicator) > 0",
        "type" : "top_domains",
        "separator" : ","
      },
      "extractor" : "CSV"
    }
    ```
    
    The "port" property/variable here is referencing "es.port" from the global config.
    
    * Run the import (parallelism of 5, batch size of 128)
    ```
    echo "truncate 'enrichment'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10k.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128 && echo "count 'enrichment'" | hbase shell
    ```
    
    You should see 9275 records in HBase. (Less than the perhaps expected 10k)
    
    * Now run it again on the top-10 set.
    ```
    echo "truncate 'enrichment'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -i ./top-10.csv -t enrichment -c t -e ./extractor.json -p 5 -b 128 && echo "count 'enrichment'" | hbase shell
    ```
    
    You should get 9 values as below:
    ```
    scan 'enrichment'
    ROW                                                                     COLUMN+CELL
     \x09\x00\x0F,\x10\xE5\xD1\xDE_\xBF\x9E\xA7d\xF2\xA8\x94\x00\x0Btop_dom column=t:v, timestamp=1486513090953, value={"port":"9300","domain":"yahoo","rank":"5"}
     ains\x00\x05yahoo
     \x11\xCA\xCF\x01\xB4\xC5\x11@\x0C\xA1A,\xE9j~O\x00\x0Btop_domains\x00\ column=t:v, timestamp=1486513090979, value={"port":"9300","domain":"tmall","rank":"10"}
     x05tmall
     \x13)`\xFC\xF2\xBF\xF9\xC1a\xC8a\xF1h\x0E\xB5\x11\x00\x0Btop_domains\x column=t:v, timestamp=1486513090930, value={"port":"9300","domain":"youtube","rank":"2"}
     00\x07youtube
     1\xC2I\x05k\xEA\x0EY\xE1\xAD\xA0$U\xA9kc\x00\x0Btop_domains\x00\x06goo column=t:v, timestamp=1486513090964, value={"port":"9300","domain":"google","rank":"7"}
     gle
     =\xDD\xDFH\x95\xC0\xB9\xD9\xBAKX\x8B\x9B2T\x9F\x00\x0Btop_domains\x00\ column=t:v, timestamp=1486513090942, value={"port":"9300","domain":"facebook","rank":"3"}
     x08facebook
     D\xDE\x1C\x9A\xCF\x07S\x9A\xDEB\xDB\x87D\x1F\x1D\xF4\x00\x0Btop_domain column=t:v, timestamp=1486513090974, value={"port":"9300","domain":"qq","rank":"9"}
     s\x00\x02qq
     u\xBC\xFC\xC9\x09\x9Af\xE1\xC8\xA5\x9A\x93\xCB0c\x01\x00\x0Btop_domain column=t:v, timestamp=1486513090970, value={"port":"9300","domain":"amazon","rank":"8"}
     s\x00\x06amazon
     \xC7\xA5.l\xC21\xFAQ8\x1E\x5C\x99p\x93_\x9A\x00\x0Btop_domains\x00\x09 column=t:v, timestamp=1486513090958, value={"port":"9300","domain":"wikipedia","rank":"6"}
     wikipedia
     \xCC\xCA\xBF;\x92\xA1\xA0k\xE4\x83i\xBD\xC3\xA8\xE8p\x00\x0Btop_domain column=t:v, timestamp=1486513090948, value={"port":"9300","domain":"baidu","rank":"4"}
     s\x00\x05baidu
    ```
    
    Once again, we get fewer than the original dataset size. This is because multiple records are mapping to the same resulting keys in HBase.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mmiklavc/incubator-metron top-domains-merge

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-metron/pull/445.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #445
    
----
commit 64a2fc6ee1190776bcbb46ecf6841b58ce2bf311
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-01-25T21:38:08Z

    save some work and notes

commit a6a6ab64e2777610ff57727195d3ce0d2c2c8cb1
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-01-27T14:25:54Z

    Extraction done

commit 47d814ef95d67738d20ce5dc530ba7b05d418a96
Author: cstella <ce...@gmail.com>
Date:   2017-01-27T23:15:44Z

    Multithreading the SimpleEnrichmentFlatFileLoader

commit 918d4ce4aea5d7dfde992f32bf049c70f35dd182
Author: cstella <ce...@gmail.com>
Date:   2017-01-27T23:23:19Z

    doc changes.

commit c6ca3a86881eb77bc9598a61e3c0cf8280ccb03f
Author: cstella <ce...@gmail.com>
Date:   2017-01-27T23:39:56Z

    Updating docs.

commit 8c9a79cdfa38ea2fbd161095d5e346147558ec5f
Author: cstella <ce...@gmail.com>
Date:   2017-01-28T03:36:31Z

    Investigating integration tests.

commit 315bd181aa634290ab987441d81c28addb7952e2
Author: cstella <ce...@gmail.com>
Date:   2017-01-28T04:09:28Z

    Update integration test to be a proper integration test.

commit 004c6f41b6c1cc3ecea70513e1a468501bd32e3c
Author: cstella <ce...@gmail.com>
Date:   2017-01-28T04:49:37Z

    Adding spliterator unit test for completeness

commit f8dd48ef920c948e1fc5ff736e386f641e551b2b
Author: cstella <ce...@gmail.com>
Date:   2017-01-28T05:01:42Z

    Updating test to use a proper file

commit 9b04f9723d442c8f4fb7a8bcaa1d733fc1305dc4
Author: cstella <ce...@gmail.com>
Date:   2017-01-28T05:17:12Z

    Updating docs and renaming a few things.

commit eb5b82cc35bd767a169f548ea8144dd9ae165f84
Author: cstella <ce...@gmail.com>
Date:   2017-01-28T05:23:25Z

    Update one more test case.

commit 81c42afa2ff619ca23bfa5ec546c94ee8d6063e5
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-01-30T16:09:52Z

    partial commit - adding additional filter and transform for indicator

commit 310c98bd946b2fdb320193cce85d368f016bf8c3
Author: cstella <ce...@gmail.com>
Date:   2017-01-30T20:36:23Z

    Merge branch 'master' into unified_loader

commit 3f6e3ba4f30e41c94ff25027f1fd7c839ea6c9bf
Author: cstella <ce...@gmail.com>
Date:   2017-01-31T15:39:03Z

    Updating simple enrichment flat file loader to be complete.

commit 2bdaf419621704970159e75e202acfeb868c3571
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-01-31T20:16:10Z

    Merge branch 'master' into top-domains

commit 79cfdb4fba5e82e9e170bfc77c7133e6646f9787
Author: cstella <ce...@gmail.com>
Date:   2017-01-31T22:12:05Z

    Removing old threatintel_bulk_load.sh script and integrating into the flatfile load script

commit bf7756b52e66907ca23a576ba9be9ab40b33f77d
Author: cstella <ce...@gmail.com>
Date:   2017-01-31T22:22:17Z

    Forgot licenses.

commit e5729a296bdbef6d2d3ee87c69aade396708f47d
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-01T00:16:06Z

    Merge with master. Get indicator transforms and filter working

commit a104f464e6b882121c7ab44079a5570d282c8457
Author: cstella <ce...@gmail.com>
Date:   2017-02-01T00:28:46Z

    updating script.

commit b121e13d892834865847ddd806cbf10da63fa44e
Author: cstella <ce...@gmail.com>
Date:   2017-02-01T00:34:28Z

    Merge branch 'master' into unified_loader

commit b5a9e5a9243576b27d59e959dfab3e99d34eb761
Author: cstella <ce...@gmail.com>
Date:   2017-02-01T00:57:02Z

    Added gzip and zip to regular files

commit 323267ddfb52ab1aa7488e02643a8158044797e2
Author: cstella <ce...@gmail.com>
Date:   2017-02-01T15:04:53Z

    Fixed stupid zip issue.

commit bc26b5b3992b91097bb4fc4b214d4b6bacaddfbb
Author: cstella <ce...@gmail.com>
Date:   2017-02-01T16:27:58Z

    Updating readme and making progress bar optional and better.

commit 6cdf35d94f72be7da524fd5f854876f131ddb9f9
Author: cstella <ce...@gmail.com>
Date:   2017-02-01T17:39:59Z

    updating tests to include gzip and zip

commit fd718bffa5e97f2c5c510b38d6a6d3812aefbed9
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-01T18:57:04Z

    Refactor

commit d24f0c974d27e3861cb431c48efb3380a372e58b
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-02T19:03:56Z

    Get unit test for extractor decorator working

commit d9bb54ec27a0f3282d28ba40d043f0045c167a54
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-02T21:47:08Z

    Add negative test cases. Refactor options as enum in extractor decorator

commit 43c09c810c7d7cfa05cffa4609edab7ba2f24492
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-03T18:10:34Z

    Intermediate commit - need to fetch from PR432

commit eafc786250d9b8e6283bd71c91bbd270ba4d1311
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-03T18:52:03Z

    Get integration tests for flat file loader working with my branch. Fix trampled commit for ExtractorHandler

commit ad1aef760948109565b7144479151312ebccc24d
Author: Michael Miklavcic <mi...@gmail.com>
Date:   2017-02-03T19:46:05Z

    Get integration tests working for Stellar transformations in the file loader

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100131091
  
    --- Diff: metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.dataloads.extractor;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.log4j.Logger;
    +import org.apache.metron.common.configuration.ConfigurationsUtils;
    +import org.apache.metron.common.dsl.Context;
    +import org.apache.metron.common.dsl.MapVariableResolver;
    +import org.apache.metron.common.dsl.StellarFunctions;
    +import org.apache.metron.common.stellar.StellarPredicateProcessor;
    +import org.apache.metron.common.stellar.StellarProcessor;
    +import org.apache.metron.common.utils.ConversionUtils;
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.enrichment.lookup.LookupKV;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.*;
    +
    +import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
    +
    +public class TransformFilterExtractorDecorator extends ExtractorDecorator {
    +  private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
    +
    +  protected enum ExtractorOptions {
    +    VALUE_TRANSFORM("value_transform"),
    +    VALUE_FILTER("value_filter"),
    +    INDICATOR_TRANSFORM("indicator_transform"),
    +    INDICATOR_FILTER("indicator_filter"),
    +    ZK_QUORUM("zk_quorum"),
    +    INDICATOR("indicator");
    +
    +    private String key;
    +
    +    ExtractorOptions(String key) {
    +      this.key = key;
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return key;
    +    }
    +
    +    public boolean existsIn(Map<String, Object> config) {
    +      return config.containsKey(key);
    +    }
    +  }
    +
    +  private Optional<CuratorFramework> zkClient;
    +  private Map<String, String> valueTransforms;
    +  private Map<String, String> indicatorTransforms;
    +  private String valueFilter;
    +  private String indicatorFilter;
    +  private Context stellarContext;
    +  private StellarProcessor transformProcessor;
    +  private StellarPredicateProcessor filterProcessor;
    +  private Map<String, Object> globalConfig;
    +
    +  public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
    +    super(decoratedExtractor);
    +    this.zkClient = Optional.empty();
    +    this.valueTransforms = new LinkedHashMap<>();
    +    this.indicatorTransforms = new LinkedHashMap<>();
    +    this.valueFilter = "";
    +    this.indicatorFilter = "";
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  @Override
    +  public void initialize(Map<String, Object> config) {
    +    super.initialize(config);
    +    if (VALUE_TRANSFORM.existsIn(config)) {
    +      this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
    +    }
    +    if (INDICATOR_TRANSFORM.existsIn(config)) {
    +      this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
    +    }
    +    if (VALUE_FILTER.existsIn(config)) {
    +      this.valueFilter = getFilter(config, VALUE_FILTER.toString());
    +    }
    +    if (INDICATOR_FILTER.existsIn(config)) {
    +      this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
    +    }
    +    String zkClientUrl = "";
    +    if (ZK_QUORUM.existsIn(config)) {
    +      zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
    +    }
    +    zkClient = setupClient(zkClient, zkClientUrl);
    +    this.globalConfig = getGlobalConfig(zkClient);
    +    this.stellarContext = createContext(zkClient);
    +    StellarFunctions.initialize(stellarContext);
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  private String getFilter(Map<String, Object> config, String valueFilter) {
    +    return ConversionUtils.convertOrFail(config.get(valueFilter), String.class);
    +  }
    +
    +  /**
    +   * Get a map of the transformations from the config of the specified type
    +   * @param config main config map
    +   * @param type the transformation type to get from config
    +   * @return map of transformations.
    +   */
    +  private Map<String, String> getTransforms(Map<String, Object> config, String type) {
    +    Map<Object, Object> transformsConfig = ConversionUtils.convertOrFail(config.get(type), Map.class);
    +    Map<String, String> transforms = new LinkedHashMap<>();
    +    for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
    +      String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
    +      String val = ConversionUtils.convertOrFail(e.getValue(), String.class);
    +      transforms.put(key, val);
    +    }
    +    return transforms;
    +  }
    +
    +  /**
    +   * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
    +   * @param zookeeperUrl The Zookeeper URL.
    +   */
    +  private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
    +    // can only create client if we have a valid zookeeper URL
    +    if (!zkClient.isPresent()) {
    +      if (StringUtils.isNotBlank(zookeeperUrl)) {
    +        CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
    +        client.start();
    +        return Optional.of(client);
    +      } else {
    +        LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
    +        return Optional.empty();
    +      }
    +    } else {
    +      return zkClient;
    +    }
    +  }
    +
    +  private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
    +    if (zkClient.isPresent()) {
    +      try {
    +        return JSONUtils.INSTANCE.load(
    +                new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
    +                new TypeReference<Map<String, Object>>() {
    +                });
    +      } catch (Exception e) {
    +        LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
    +      }
    +    }
    +    return new LinkedHashMap<>();
    +  }
    +
    +  private Context createContext(Optional<CuratorFramework> zkClient) {
    +    Context.Builder builder = new Context.Builder();
    +    if (zkClient.isPresent()) {
    +      builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
    +              .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
    --- End diff --
    
    I mean `{}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #445: METRON-706: Add Stellar transformations and fil...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on the issue:

    https://github.com/apache/incubator-metron/pull/445
  
    Note: Per the recent issue in master with Ansible, I tested the following as well
    
    * Create threat_ip.csv
    ```
    #
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    #Add single column of ip address to alert
    #Public lists are available on the internet
    # example:
    23.113.113.105
    24.107.205.249
    24.108.62.255
    24.224.153.71
    27.4.1.212
    27.131.149.102
    31.24.30.31
    31.131.251.33
    31.186.99.250
    31.192.209.119
    31.192.209.150
    31.200.244.17
    37.34.52.185
    37.58.112.101
    37.99.146.27
    37.128.132.96
    37.140.195.177
    37.140.199.100
    ```
    
    * Uploaded threat_ip.csv to HDFS:
    ```
    hdfs dfs -put -f threat_ip.csv
    ```
    
    * Create extractor.json
    ```
    {
      "config": {
        "columns": {
          "ip": 0
        },
        "indicator_column": "ip",
        "type" : "malicious_ip",
        "separator": ","
      },
      "extractor": "CSV"
    }
    ```
    
    * Run as root user from /root
    ```
    echo "truncate 'threatintel'" | hbase shell && /usr/metron/0.3.0/bin/flatfile_loader.sh -c t -t threatintel -e extractor.json -i /user/root -m MR
    ```
    
    * Verify the records are there
    ```
    echo "scan 'threatintel'" | hbase shell
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100072226
  
    --- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java ---
    @@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
         }
       };
     
    +  public static <T> T convertOrFail(Object o, Class<T> clazz) {
    +    if (clazz.isInstance(o)) {
    --- End diff --
    
    Any reason why this isn't just `clazz.cast(o)` and called `cast`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #445: METRON-706: Add Stellar transformations and fil...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on the issue:

    https://github.com/apache/incubator-metron/pull/445
  
    Adding these timing notes about the import for reference:
    
    **No filter, local load, multiple threads (5), batch 128**
    real	10m22.127s
    user	11m11.873s
    sys	4m23.912s
    903392 rows
    
    **With filters, multiple threads (5), batch 128 (1 record less)**
    real	10m58.210s
    user	11m10.592s
    sys	4m16.585s
    903391 rows
    
    **MapReduce mode**
    real	9m20.566s
    user	0m26.853s
    sys	0m10.334s
    903391 rows


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100129984
  
    --- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java ---
    @@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
         }
       };
     
    +  public static <T> T convertOrFail(Object o, Class<T> clazz) {
    +    if (clazz.isInstance(o)) {
    --- End diff --
    
    Good point. I'm removing this entirely. I'll just use casting in place, e.g.
    
    **Example 1**
    `String val = (String) map.get("foo"); // throws class cast exception on failure, which is what we want`
    
    **Example 2**
    ```
    Map<Object, Object> a = new HashMap() {{
        put("hello", "world");
        put(1, 2);
    }};
    
    Map<String, Object> b = new HashMap() {{
        put("a", a);
    }};
    Map<Object, Object> c = (Map) b.get("a"); // throws class cast exception if not a Map
    Map<String, String> d = new HashMap<>();
    for (Map.Entry<Object, Object> entry : c.entrySet()) {
        d.put((String) entry.getKey(), (String) entry.getValue()); // throws class cast exception. also what we want
    }
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #445: METRON-706: Add Stellar transformations and fil...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/445
  
    +1 by inspection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100087723
  
    --- Diff: metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java ---
    @@ -200,6 +295,7 @@ public static void teardown() throws Exception {
         multilineZipFile.delete();
         lineByLineExtractorConfigFile.delete();
         wholeFileExtractorConfigFile.delete();
    +    stellarExtractorConfigFile.delete();
    --- End diff --
    
    Good catch, fixing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100130952
  
    --- Diff: metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.dataloads.extractor;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.log4j.Logger;
    +import org.apache.metron.common.configuration.ConfigurationsUtils;
    +import org.apache.metron.common.dsl.Context;
    +import org.apache.metron.common.dsl.MapVariableResolver;
    +import org.apache.metron.common.dsl.StellarFunctions;
    +import org.apache.metron.common.stellar.StellarPredicateProcessor;
    +import org.apache.metron.common.stellar.StellarProcessor;
    +import org.apache.metron.common.utils.ConversionUtils;
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.enrichment.lookup.LookupKV;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.*;
    +
    +import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
    +
    +public class TransformFilterExtractorDecorator extends ExtractorDecorator {
    +  private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
    +
    +  protected enum ExtractorOptions {
    +    VALUE_TRANSFORM("value_transform"),
    +    VALUE_FILTER("value_filter"),
    +    INDICATOR_TRANSFORM("indicator_transform"),
    +    INDICATOR_FILTER("indicator_filter"),
    +    ZK_QUORUM("zk_quorum"),
    +    INDICATOR("indicator");
    +
    +    private String key;
    +
    +    ExtractorOptions(String key) {
    +      this.key = key;
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return key;
    +    }
    +
    +    public boolean existsIn(Map<String, Object> config) {
    +      return config.containsKey(key);
    +    }
    +  }
    +
    +  private Optional<CuratorFramework> zkClient;
    +  private Map<String, String> valueTransforms;
    +  private Map<String, String> indicatorTransforms;
    +  private String valueFilter;
    +  private String indicatorFilter;
    +  private Context stellarContext;
    +  private StellarProcessor transformProcessor;
    +  private StellarPredicateProcessor filterProcessor;
    +  private Map<String, Object> globalConfig;
    +
    +  public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
    +    super(decoratedExtractor);
    +    this.zkClient = Optional.empty();
    +    this.valueTransforms = new LinkedHashMap<>();
    +    this.indicatorTransforms = new LinkedHashMap<>();
    +    this.valueFilter = "";
    +    this.indicatorFilter = "";
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  @Override
    +  public void initialize(Map<String, Object> config) {
    +    super.initialize(config);
    +    if (VALUE_TRANSFORM.existsIn(config)) {
    +      this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
    +    }
    +    if (INDICATOR_TRANSFORM.existsIn(config)) {
    +      this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
    +    }
    +    if (VALUE_FILTER.existsIn(config)) {
    +      this.valueFilter = getFilter(config, VALUE_FILTER.toString());
    +    }
    +    if (INDICATOR_FILTER.existsIn(config)) {
    +      this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
    +    }
    +    String zkClientUrl = "";
    +    if (ZK_QUORUM.existsIn(config)) {
    +      zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
    +    }
    +    zkClient = setupClient(zkClient, zkClientUrl);
    +    this.globalConfig = getGlobalConfig(zkClient);
    +    this.stellarContext = createContext(zkClient);
    +    StellarFunctions.initialize(stellarContext);
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  private String getFilter(Map<String, Object> config, String valueFilter) {
    +    return ConversionUtils.convertOrFail(config.get(valueFilter), String.class);
    +  }
    +
    +  /**
    +   * Get a map of the transformations from the config of the specified type
    +   * @param config main config map
    +   * @param type the transformation type to get from config
    +   * @return map of transformations.
    +   */
    +  private Map<String, String> getTransforms(Map<String, Object> config, String type) {
    +    Map<Object, Object> transformsConfig = ConversionUtils.convertOrFail(config.get(type), Map.class);
    +    Map<String, String> transforms = new LinkedHashMap<>();
    +    for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
    +      String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
    +      String val = ConversionUtils.convertOrFail(e.getValue(), String.class);
    +      transforms.put(key, val);
    +    }
    +    return transforms;
    +  }
    +
    +  /**
    +   * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
    +   * @param zookeeperUrl The Zookeeper URL.
    +   */
    +  private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
    +    // can only create client if we have a valid zookeeper URL
    +    if (!zkClient.isPresent()) {
    +      if (StringUtils.isNotBlank(zookeeperUrl)) {
    +        CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
    +        client.start();
    +        return Optional.of(client);
    +      } else {
    +        LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
    +        return Optional.empty();
    +      }
    +    } else {
    +      return zkClient;
    +    }
    +  }
    +
    +  private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
    +    if (zkClient.isPresent()) {
    +      try {
    +        return JSONUtils.INSTANCE.load(
    +                new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
    +                new TypeReference<Map<String, Object>>() {
    +                });
    +      } catch (Exception e) {
    +        LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
    +      }
    +    }
    +    return new LinkedHashMap<>();
    +  }
    +
    +  private Context createContext(Optional<CuratorFramework> zkClient) {
    +    Context.Builder builder = new Context.Builder();
    +    if (zkClient.isPresent()) {
    +      builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
    +              .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
    --- End diff --
    
    By empty, do you mean null or "{}"? Does Stellar handle that differently from choosing not to add the capability at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-metron/pull/445


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100074335
  
    --- Diff: metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/extractor/TransformFilterExtractorDecorator.java ---
    @@ -0,0 +1,236 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.dataloads.extractor;
    +
    +import com.fasterxml.jackson.core.type.TypeReference;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.log4j.Logger;
    +import org.apache.metron.common.configuration.ConfigurationsUtils;
    +import org.apache.metron.common.dsl.Context;
    +import org.apache.metron.common.dsl.MapVariableResolver;
    +import org.apache.metron.common.dsl.StellarFunctions;
    +import org.apache.metron.common.stellar.StellarPredicateProcessor;
    +import org.apache.metron.common.stellar.StellarProcessor;
    +import org.apache.metron.common.utils.ConversionUtils;
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.enrichment.lookup.LookupKV;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.util.*;
    +
    +import static org.apache.metron.dataloads.extractor.TransformFilterExtractorDecorator.ExtractorOptions.*;
    +
    +public class TransformFilterExtractorDecorator extends ExtractorDecorator {
    +  private static final Logger LOG = Logger.getLogger(TransformFilterExtractorDecorator.class);
    +
    +  protected enum ExtractorOptions {
    +    VALUE_TRANSFORM("value_transform"),
    +    VALUE_FILTER("value_filter"),
    +    INDICATOR_TRANSFORM("indicator_transform"),
    +    INDICATOR_FILTER("indicator_filter"),
    +    ZK_QUORUM("zk_quorum"),
    +    INDICATOR("indicator");
    +
    +    private String key;
    +
    +    ExtractorOptions(String key) {
    +      this.key = key;
    +    }
    +
    +    @Override
    +    public String toString() {
    +      return key;
    +    }
    +
    +    public boolean existsIn(Map<String, Object> config) {
    +      return config.containsKey(key);
    +    }
    +  }
    +
    +  private Optional<CuratorFramework> zkClient;
    +  private Map<String, String> valueTransforms;
    +  private Map<String, String> indicatorTransforms;
    +  private String valueFilter;
    +  private String indicatorFilter;
    +  private Context stellarContext;
    +  private StellarProcessor transformProcessor;
    +  private StellarPredicateProcessor filterProcessor;
    +  private Map<String, Object> globalConfig;
    +
    +  public TransformFilterExtractorDecorator(Extractor decoratedExtractor) {
    +    super(decoratedExtractor);
    +    this.zkClient = Optional.empty();
    +    this.valueTransforms = new LinkedHashMap<>();
    +    this.indicatorTransforms = new LinkedHashMap<>();
    +    this.valueFilter = "";
    +    this.indicatorFilter = "";
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  @Override
    +  public void initialize(Map<String, Object> config) {
    +    super.initialize(config);
    +    if (VALUE_TRANSFORM.existsIn(config)) {
    +      this.valueTransforms = getTransforms(config, VALUE_TRANSFORM.toString());
    +    }
    +    if (INDICATOR_TRANSFORM.existsIn(config)) {
    +      this.indicatorTransforms = getTransforms(config, INDICATOR_TRANSFORM.toString());
    +    }
    +    if (VALUE_FILTER.existsIn(config)) {
    +      this.valueFilter = getFilter(config, VALUE_FILTER.toString());
    +    }
    +    if (INDICATOR_FILTER.existsIn(config)) {
    +      this.indicatorFilter = getFilter(config, INDICATOR_FILTER.toString());
    +    }
    +    String zkClientUrl = "";
    +    if (ZK_QUORUM.existsIn(config)) {
    +      zkClientUrl = ConversionUtils.convert(config.get(ZK_QUORUM.toString()), String.class);
    +    }
    +    zkClient = setupClient(zkClient, zkClientUrl);
    +    this.globalConfig = getGlobalConfig(zkClient);
    +    this.stellarContext = createContext(zkClient);
    +    StellarFunctions.initialize(stellarContext);
    +    this.transformProcessor = new StellarProcessor();
    +    this.filterProcessor = new StellarPredicateProcessor();
    +  }
    +
    +  private String getFilter(Map<String, Object> config, String valueFilter) {
    +    return ConversionUtils.convertOrFail(config.get(valueFilter), String.class);
    +  }
    +
    +  /**
    +   * Get a map of the transformations from the config of the specified type
    +   * @param config main config map
    +   * @param type the transformation type to get from config
    +   * @return map of transformations.
    +   */
    +  private Map<String, String> getTransforms(Map<String, Object> config, String type) {
    +    Map<Object, Object> transformsConfig = ConversionUtils.convertOrFail(config.get(type), Map.class);
    +    Map<String, String> transforms = new LinkedHashMap<>();
    +    for (Map.Entry<Object, Object> e : transformsConfig.entrySet()) {
    +      String key = ConversionUtils.convertOrFail(e.getKey(), String.class);
    +      String val = ConversionUtils.convertOrFail(e.getValue(), String.class);
    +      transforms.put(key, val);
    +    }
    +    return transforms;
    +  }
    +
    +  /**
    +   * Creates a Zookeeper client if it doesn't exist and a url for zk is provided.
    +   * @param zookeeperUrl The Zookeeper URL.
    +   */
    +  private Optional<CuratorFramework> setupClient(Optional<CuratorFramework> zkClient, String zookeeperUrl) {
    +    // can only create client if we have a valid zookeeper URL
    +    if (!zkClient.isPresent()) {
    +      if (StringUtils.isNotBlank(zookeeperUrl)) {
    +        CuratorFramework client = ConfigurationsUtils.getClient(zookeeperUrl);
    +        client.start();
    +        return Optional.of(client);
    +      } else {
    +        LOG.warn("Unable to setup zookeeper client - zk_quorum url not provided. **This will limit some Stellar functionality**");
    +        return Optional.empty();
    +      }
    +    } else {
    +      return zkClient;
    +    }
    +  }
    +
    +  private Map<String, Object> getGlobalConfig(Optional<CuratorFramework> zkClient) {
    +    if (zkClient.isPresent()) {
    +      try {
    +        return JSONUtils.INSTANCE.load(
    +                new ByteArrayInputStream(ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(zkClient.get())),
    +                new TypeReference<Map<String, Object>>() {
    +                });
    +      } catch (Exception e) {
    +        LOG.warn("Exception thrown while attempting to get global config from Zookeeper.", e);
    +      }
    +    }
    +    return new LinkedHashMap<>();
    +  }
    +
    +  private Context createContext(Optional<CuratorFramework> zkClient) {
    +    Context.Builder builder = new Context.Builder();
    +    if (zkClient.isPresent()) {
    +      builder.with(Context.Capabilities.ZOOKEEPER_CLIENT, zkClient::get)
    +              .with(Context.Capabilities.GLOBAL_CONFIG, () -> globalConfig);
    --- End diff --
    
    You might want an empty global config even if the `zkClient` isn't present.  Not sure, just a thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100131179
  
    --- Diff: metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ConversionUtils.java ---
    @@ -35,6 +35,14 @@ protected ConvertUtilsBean initialValue() {
         }
       };
     
    +  public static <T> T convertOrFail(Object o, Class<T> clazz) {
    +    if (clazz.isInstance(o)) {
    --- End diff --
    
    Agreed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #445: METRON-706: Add Stellar transformations ...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/445#discussion_r100075392
  
    --- Diff: metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/flatfile/SimpleEnrichmentFlatFileLoaderIntegrationTest.java ---
    @@ -200,6 +295,7 @@ public static void teardown() throws Exception {
         multilineZipFile.delete();
         lineByLineExtractorConfigFile.delete();
         wholeFileExtractorConfigFile.delete();
    +    stellarExtractorConfigFile.delete();
    --- End diff --
    
    Did you forget `customLineByLineExtractorConfigFile.delete();`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---