You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by nickwallen <gi...@git.apache.org> on 2017/02/09 20:34:11 UTC

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

GitHub user nickwallen opened a pull request:

    https://github.com/apache/incubator-metron/pull/449

    METRON-701 Triage Metrics Produced by the Profiler

    ## [METRON-701](https://issues.apache.org/jira/browse/METRON-701)
    
    Please do not merge.  I am looking for feedback on this pull request.  I will comment on the specific area of code below.
    
    ### Problem
    
    The motivating example is that I would like to create an alert if the number of inbound flows to any host over a 15 minute interval is abnormal.
    
    The value being interrogated here, the number of inbound flows, is not a static value contained within any single telemetry message. This value is calculated across multiple messages by the Profiler. The current Threat Triage process cannot be used to interrogate values calculated by the Profiler.
    
    ### Proposed Solution
    
    I am proposing that we treat the Profiler as a source of telemetry. The measurements captured by the Profiler would be enqueued into a Kafka topic. We would then treat those Profiler messages like any other telemetry. We would parse, enrich, triage, and index those messages.
    
    ### Testing
    
    
    * If you are testing in the "Quick Dev" environment, increase the number of slots available to at least 5.  This can be done within Ambari by editing Storm's `supervisor.slots.ports` property.
    ```
    supervisor.slots.ports = [6700, 6701, 6702, 6703, 6704]
    ```
    
    * Alter the Profiler topology settings at `config/profiler.properties` to use a 1 minute period duration.  This is not necessary, but is  useful for the impatient.
    ```
    profiler.period.duration=1
    profiler.period.duration.units=MINUTES
    ```
    
    * Start only the Bro, Enrichment, and Profiler topologies.  Stop other unnecessary sevices like Elasticsearch and Kibana.
    ```
    service sensor-stubs start bro
    $METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s bro
    $METRON_HOME/bin/start_enrichment_topology.sh
    $METRON_HOME/bin/start_profiler_topology.sh
    ```
    
    * Create a Profile that simply counts messages.
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'bro'",
          "init":    { "count": "0" },
          "update":  { "count": "count + 1" },
          "result":  "count"
        }
      ]
    }
    ```
    
    * Look for messages to hit the 'profiles' topic in Kafka.
    ```
    $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic profiles --from-beginning
    
    {"profileName":"test","period":{"period":24776383,"startTimeMillis":1486582980000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":156,"entity":"global"}
    {"profileName":"test","period":{"period":24776384,"startTimeMillis":1486583040000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":158,"entity":"global"}
    {"profileName":"test","period":{"period":24776385,"startTimeMillis":1486583100000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":152,"entity":"global"}
    {"profileName":"test","period":{"period":24776386,"startTimeMillis":1486583160000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":158,"entity":"global"}
    {"profileName":"test","period":{"period":24776387,"startTimeMillis":1486583220000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":167,"entity":"global"}
    {"profileName":"test","period":{"period":24776388,"startTimeMillis":1486583280000,"durationMillis":60000},"groups":[],"definition":{"result":"count","foreach":"'global'","init":{"count":"0"},"onlyif":"true","profile":"test","destination":["hbase","kafka"],"update":{"count":"count + 1"},"groupBy":[]},"value":144,"entity":"global"}
    ```
    
    * Ensure that the Profiler continues to write to HBase.
    ```
    $ bin/stellar -z node1:2181
    Stellar, Go!
    Please note that functions are loading lazily in the background and will be unavailable until loaded fully.
    {es.clustername=metron, es.ip=node1, es.port=9300, es.date.format=yyyy.MM.dd.HH, bootstrap.servers=node1:6667, profiler.client.period.duration=1, profiler.client.period.duration.units=MINUTES}
    [Stellar]>>>
    [Stellar]>>> PROFILE_GET("test", "global", 5, "MINUTES")
    [146, 147, 142, 120, 151]
    ```
    
    * Start a parser to consume the profile messages.
    ```
    /usr/metron/0.3.0/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s profiles
    ```
    
    * The parser should be adding `source.type=profiles` and `is_alert=true`.
    ```
    $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic enrichments --from-beginning | grep profileName
    
    {"profileName":"test","original_string":"{\"profileName\":\"test\",\"period\":{\"period\":24776474,\"startTimeMillis\":1486588440000,\"durationMillis\":60000},\"groups\":[],\"definition\":{\"result\":\"count\",\"foreach\":\"'global'\",\"init\":{\"count\":\"0\"},\"onlyif\":\"source.type == 'bro'\",\"profile\":\"test\",\"destination\":[\"hbase\",\"kafka\"],\"update\":{\"count\":\"count + 1\"},\"groupBy\":[]},\"value\":156,\"entity\":\"global\"}","groups":[],"is_alert":true,"value":156,"entity":"global","timestamp":1486588475670,"source.type":"profiles"}
    ```
    
    * Add a threat triage rule that applies to the measurements coming out of the Profiler.  Run the following in a Stellar REPL.
    ```
    # fetch the current config
    conf := CONFIG_GET("ENRICHMENT", "profiles")
    
    # create a rule 
    rule := profileName == "test" and value > 100
    triage := { "name":"Test Rule", "rule": SHELL_GET_EXPRESSION('rule'), "score":"10" }
    conf := THREAT_TRIAGE_ADD(conf, [triage])
    
    # save the configuration
    CONFIG_PUT("ENRICHMENT", conf, "profiles")
    ```
    
    * Ensure that the threat triage rule is being applied correctly.  Look for a key named `threat.triage.level` when `value > 100`.
    ```
    $ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic indexing | grep profileName
    
    {"profileName":"test","enrichmentsplitterbolt.splitter.end.ts":"1486589135689","enrichmentsplitterbolt.splitter.begin.ts":"1486589135689","groups":[],"is_alert":"true","source.type":"profiles","original_string":"{\"profileName\":\"test\",\"period\":{\"period\":24776485,\"startTimeMillis\":1486589100000,\"durationMillis\":60000},\"groups\":[],\"definition\":{\"result\":\"count\",\"foreach\":\"'global'\",\"init\":{\"count\":\"0\"},\"onlyif\":\"source.type == 'bro'\",\"profile\":\"test\",\"destination\":[\"hbase\",\"kafka\"],\"update\":{\"count\":\"count + 1\"},\"groupBy\":[]},\"value\":160,\"entity\":\"global\"}","threatintelsplitterbolt.splitter.end.ts":"1486589135694","threat.triage.level":10.0,"threatinteljoinbolt.joiner.ts":"1486589135697","enrichmentjoinbolt.joiner.ts":"1486589135692","threatintelsplitterbolt.splitter.begin.ts":"1486589135694","value":160,"entity":"global","timestamp":1486589135684}
    ```
    
    These steps have shown how metrics produced by the Profiler can be triaged using the existing Threat Triage mechanism in Metron.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nickwallen/incubator-metron METRON-701

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-metron/pull/449.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #449
    
----
commit 88d355a7e9168575193fd105c331dbf67351e0a8
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-07T19:48:18Z

    METRON-701 Bulk writer expects a field named 'message' that is a JSONObject

commit e4e7e54c8332f9838f36ff2871fc61e9db6c661d
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-07T20:27:58Z

    METRON-701 Embedded the profile definition in the profile measurement itself.  Also added a hack because the KafkaBolt requires a JSONObject

commit fde7dfd56e2d555b4658c0abdd7f233aa37817b2
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-08T14:19:20Z

    METRON-701 Moved handling of default value of 'onlyif' to the ProfileConfig class

commit aa52e98327efaead5d9a2169a2d4e39be8889dd4
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-08T15:50:13Z

    METRON-701 kafka writer expects a JSONObject

commit c04d8a326b302f207e6fef2842cad4e4f8f45bae
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-08T17:04:08Z

    METRON-701 Introduced the DestinationHandler to simplify the differences between downstream destinations like Kafka and HBase

commit f46a5d196f9ee0d0cbea70b36caf60fe66716122
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-08T17:04:27Z

    METRON-701 The message should not be used in field grouping

commit a81d3120d4a7832bec879ec3f861670ab49e5aab
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-08T21:36:13Z

    METRON-701 Properly serializing the ProfileMeasurement.  Added documentation for new options

commit 6593481840d2348b7c86630f8d50cf755fa2b6f5
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-08T21:57:19Z

    METRON-701 Fixed up integration test

commit c4744aadd224917c88dc409882024b27754b6836
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T14:11:59Z

    METRON-701 Added check for invalid destinations

commit fe136d8adc0e6b6b67bc12e259088b76c5d6eecc
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T14:15:22Z

    METRON-701 Fixed exception message

commit 6b221df9fe206f225953df428e0e28fba3541aab
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T14:22:18Z

    METRON-701 Allowing the streamId to be set, just in case.

commit 152b75c3e38e1fde82c004209cce7d81a6529629
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T17:11:15Z

    METRON-701 Rename ttl

commit 8cc8008e208cc12a2399443ca6f8abe949d952e1
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T19:07:38Z

    Added README example 4 to integration tests.  Added missing license headers

commit e766c4806033387d50b1992a5a0dff9814b19370
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T20:07:40Z

    METRON-701 Fixed dependencies list

commit 5d329e64d89ed747d727a6cc2b8a1489f4db2dde
Author: Nick Allen <ni...@nickallen.org>
Date:   2017-02-09T20:28:37Z

    METRON-701 Fix up comments

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r102090017
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,78 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    try {
    +      JSONObject message = new JSONObject();
    +      message.put("profile", measurement.getDefinition().getProfile());
    +      message.put("entity", measurement.getEntity());
    +      message.put("period", measurement.getPeriod().getPeriod());
    +      message.put("periodStartTime", measurement.getPeriod().getStartTimeMillis());
    +
    +      // TODO How to serialize an object (like a StatisticsProvider) in a form that can be used on the other side? (Threat Triage)
    +      // TODO How to embed binary in JSON?
    +      message.put("value", measurement.getValue());
    +
    --- End diff --
    
    Pushing the data to Kafka is *not* meant as a replacement for HBase.  You still need to persist the Profile data in HBase, just like before.  The data is not pushed to Kafka to be used for retrieval.  The data is sent to Kafka to give us a hook into the triage process.  
    
    Let me try to explain better.  To triage something, I need to know two things...
    1. What is the current value? 
    2. What is an abnormal value?
    
    #### What is the current value? 
    The Profile data in Kafka helps answer the first question. What is the current value? Prior to this PR, there is no way for the Triage process to 'see' a summarized value.  I tried to explain this in the PR description (probably poorly) as follows.
    
    > The value being interrogated here, the number of inbound flows, is not a static value contained within any single telemetry message. This value is calculated across multiple messages by the Profiler. The current Threat Triage process cannot be used to interrogate values calculated by the Profiler.
    
    #### What is an abnormal value?
    The Profile data in HBase helps answer this second question.  What is an abnormal value?  The triage rules would still use the Profiler Client and retrieve the data from HBase that defines what normal is.  I think you are referring to this in your response.
    
    Let me know if this provides any clarity or if I am misunderstanding the question.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Still thinking through the implications, but it looks pretty clean and intuitive this way (at least more intuitive).  
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "result":  {
             "profile" : "stats",
             "triage" : "{ 'mean' : 'STATS_MEAN(stats)', 'stddev' : 'STATS_SD(stats)' }"
           }     
        }
      ]
    }
    ```
    
    Maybe even get rid of "result" altogether?
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "profile" : "stats",
          "triage" : "{ 'mean' : 'STATS_MEAN(stats)', 'stddev' : 'STATS_SD(stats)' }"
        }
      ]
    }
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Ok, I'm ok with that.  The writer should be more adaptable here and that shouldn't hold your PR up, agreed.  Can we make it the enrichment queue, though?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    I think routing from stellar within routing in storm is confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Yep, looks good, got my +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    This probably motivates us to allow `OUTLIER_MAD_SCORE` to accept median and median absolute deviation as parameters rather than just the State object as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103494594
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    --- End diff --
    
    The `timestamp` comes from `System.currentTimeMillis()`.  I don't like this very much, but I am not sure of a better alternative at the moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    > You seem to be sending every profile into kafka, not just the configured ones
    
    Just for clarity, you can define the destination for each profile.  It defaults to a `"destination" : ["hbase", "kafka"]`.  For example, if I only wanted to send to HBase.
    ```
    {
       "profile": "profile-one-destination",
       "foreach": "ip_src_addr",
        "init":   { "x": "0" },
        "update": { "x": "x + 1" },
         "result": "x",
        "destination": ["hbase"]
    }
    ```
    
    But that is just a side point.  Your idea is really interesting.  We've talked before about having multiple result values, which I think is super useful.  I'll think on this a bit.  Thanks for the feedback.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    I can see the value of the additional flexibility here.  Of course, the flip side is that I am always worried about too much complexity, as you probably guessed. 
    
    I don't know if your proposal gets us all the way there in regards to user-focused over implementation-focused terminology.  Personally, I'd like to see profile definitions that are portable and work no matter where the Profiler is configured to persist data.  But maybe that is a pipe dream.
    
    Your point on backwards compatibility is a good one.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    > Be backwards compatible with the current syntax.
    
    This proposed syntax isn't directly backwards compatible.  Were you assuming we would do a translation of sorts?  
    
    Like translate this...
    ```
    { "profiles": [
        {
          ...
          "result":  "stats"
        }
      ]
    }
    ```
    
    To this...
    ```
    { "profiles": [
        {
          ...
          "result":  {
             "profile" : "stats",
           }     
        }
      ]
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    @cestella Thanks for laying out your other ideas for Medium and Longer term.  We can open those up for community debate on separate JIRAs, but it was very worthwhile for you to begin laying those out here.  They provided some good context.
    
    @ottobackwards Not sure which part your comment was in reference to.  Do you have any concerns specifically with the "Near Term" items that I will tackle as part of this PR?  I'd like to make sure we reach consensus on those first.  We can debate the other items later (if that works for you.)
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    > Ninja Edit: I think the kafka topic written to should be pulled from zookeeper...
    
    @cestella I remember now why I settled on making the topic name a static configuration from the topology properties.  Our `BulkMessageWriterBolt` & `KafkaMessageWriter` classes seem to have been designed to only work with statically defined topic names.  I would have to change those core classes, if I want to set the topic name dynamically from Zookeeper.  And I tend to get smacked around when coming close to core classes :)
    
    Am I missing something?  Is there a way to define the topic dynamically while using the `BulkMessageWriterBolt` & `KafkaMessageWriter` classes unchanged?
    
    If not, I'd prefer to keep the topic name as a static property defined in the topology properties, at least for this PR. My second choice would be to open a completely separate PR to update those core classes to accept dynamic topic names.  It does not seem like a complex change, but those are core classes and will likely cause some debate.
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by james-sirota <gi...@git.apache.org>.
Github user james-sirota commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r100719426
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,78 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    try {
    +      JSONObject message = new JSONObject();
    +      message.put("profile", measurement.getDefinition().getProfile());
    +      message.put("entity", measurement.getEntity());
    +      message.put("period", measurement.getPeriod().getPeriod());
    +      message.put("periodStartTime", measurement.getPeriod().getStartTimeMillis());
    +
    +      // TODO How to serialize an object (like a StatisticsProvider) in a form that can be used on the other side? (Threat Triage)
    +      // TODO How to embed binary in JSON?
    +      message.put("value", measurement.getValue());
    +
    --- End diff --
    
    One of the problems that I see with this approach is that generally you wouldn't ask a question "do you think that the number of inbound connections from ip X is abnormal for the past 15 mins.  the question you would ask is the number of inbound connections for ip X abnormal for a Tuesday at 11.15AM.  To figure out the answer to this question we would have to retrieve telemetry from historical Tuesday data at 11.15AM and check if our value at hand for right now is an outlier based on that data.  METRON-690 is designed to make these retrieval patterns possible.  Because you retrieving values that are potentially more than 24 hours long I am not sure keeping this data in Kafka is a good idea.  I think you have to retrieve the values via a multi-get in Hbase


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r100408667
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,78 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.metron.common.utils.JSONUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    try {
    +      JSONObject message = new JSONObject();
    +      message.put("profile", measurement.getDefinition().getProfile());
    +      message.put("entity", measurement.getEntity());
    +      message.put("period", measurement.getPeriod().getPeriod());
    +      message.put("periodStartTime", measurement.getPeriod().getStartTimeMillis());
    +
    +      // TODO How to serialize an object (like a StatisticsProvider) in a form that can be used on the other side? (Threat Triage)
    +      // TODO How to embed binary in JSON?
    +      message.put("value", measurement.getValue());
    +
    --- End diff --
    
    Everything works just fine if your profile produces a nice, easy to serialize value like a number.  If all you want to do is triage profiles that produce numbers, then this code works.  But how should this work if your profile is producing a complex object, like is done with the STATS package?
    
    The Profiler has to serialize everything that the profiles produce.  Serializing objects into HBase works just fine.  But how should we serialize a complex object when sending it to Kafka?   
    
    Random options follow.  What else might work?
    
    * Sticking with the current architecture, I would have to embed binary data in the message JSON that is produced, which I don't think is possible or at least not ideal.
    
    * I could just write the binary data alone to a Kafka topic, but then how do we pass the meta-information like the profile and entity name?  Do I create a wrapper object that contains the name, profile and value, then serialize all of that and write to a topic?  That would be drastically different from what we do today.
    
    
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Well, let me try to make the case that this is user-focused while being aware of the limitations of implementation. ;)
    
    The main aim for adaptability is to allow multiple representations to be stored in multiple datastores.  The representation has a 1 to n relationship with the data store, (maybe "writer" can be a list of writers or a single writer?).  This puts the top-level citizen as the representation associated with a naming about what it is intended to be used for.  Put simply, it's not that kafka can only handle JSON blobs, but rather it's that we need the kurtosis for the `kurtosis_triage` rule (by the way, `kurtosis_triage` should be part of the message constructed along with the `source.type` of `profiler`...maybe `profile.type`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-metron/pull/449


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by ottobackwards <gi...@git.apache.org>.
Github user ottobackwards commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    I'm sorry, no Nick I don't


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    @cestella The core of your idea that really solves the problem that I was hung up on was adding two "result" expression; one for each destination.
    * The HBase destination is just like it is now.  I can write any serializable Java Object.  
    * The Kafka destination is more limited.  Its expression *must* result in a String.  We force the user to serialize the data.  And we kind of have to, we have no good way to embed a serialized object in a JSON message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    @nickwallen yes, I was


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    **Near Term:**  I like it.   I think we've converged on "near term".  Yay!  I will tackle these items as part of this PR.
    
    > **Longer Term:** ... In this world, the profiler is simple, it just writes messages out to the indexing topology. 
    
    Great! Yep, this is what I was hoping to do when we first starting batting this idea around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Looks great, quick question.  If I submit a profile that looks like:
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'bro'",
          "init":    { "count": "0" },
          "update":  { "count": "count + 1" },
          "result": {
              "profile": "count",
              "triage": "{ 'blah' : count, 'zork' : 'zork'}"
          }
        }
      ]
    }
    ```
    Will I get messages in kafka that look like:
    ```
    {"period.start":1488233820000
    ,"period":24803897
    ,"profile":"test"
    ,"blah":161
    ,"zork":"zork"
    ,"period.end":1488233880000
    ,"is_alert":"true"
    ,"entity":"global"
    ,"timestamp":1488233841600
    ,"source.type":"profiler"
    }
    ```
    
    I think that's an important aspect as people will probably want to submit multiple things to further triage or give context since they cannot send along our summary objects.
    
    
    Also, if someone tries to submit something that JSON can't handle (like a stats object), will it get dropped or will an exception occur?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103495230
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    +
    +      } else {
    +        throw new IllegalArgumentException(String.format("invalid type for triage value: profile=%s, entity=%s, key=%s, value=%s",
    --- End diff --
    
    I'd recommend reporting the error and not killing the topology so it shows in the storm UI.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103493583
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    +
    +      } else {
    +        throw new IllegalArgumentException(String.format("invalid type for triage value: profile=%s, entity=%s, key=%s, value=%s",
    --- End diff --
    
    @cestella asked...
    
    > Also, if someone tries to submit something that JSON can't handle (like a stats object), will it get dropped or will an exception occur?
    
    Good question.  The type check for the `triage` block occurs here in `KafkaDestinationHandler`.  This is called by `ProfileBuilderBolt.handleTick` when a tick tuple is received.  The `KafkaDestinationHandler` throws an exception which is then caught and logged by the bolt.
    
    I was originally thinking this was OK, but after looking over this again, an exception here would impact any of the Profile-Entity pairs being managed by the same `ProfileBuilderBolt` instance.  I was originally thinking that only the specific problematic profile would be impacted, but this is not the case.  
    
    I think I need to change this to log the error and move on to the next triage expression.  Of course, as a user I'd like to know explicitly if I make this mistake.  Hmm... I'll have to noodle on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    I made the required changes and updated the PR description to reflect that.  Please take a look and review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    So, I think this is an interesting approach.  My issues with it are:
    * You seem to be sending every profile into kafka, not just the configured ones.
    * You seem to be assuming that one value only is being sent into the telemetry and it's the value that you store in HBase
    
    I'd recommend, rather, that you make the `result` field more complex by making it a map where the key is the source (e.g. "hbase" or "kafka").  This allows you to separate the storage structure by storage medium.  You may, for instance, want to *STORE* a stats object in Hbase, but only send along the mean and standard deviation.  Also, I'd recommend allowing `result`to be either a string (which would presume only hbase is supported) or a Map, which would explicitly specify the structure for just the sources you want to write to.
    
    Here's a worked example config for maximum clarity (!):
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "init":    { "stats": "STATS_INIT()" },
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "result":  {
             "hbase" : "stats",
             "kafka" : "{ 'mean' : STATS_MEAN(stats), 'stddev' : STATS_SD(stats) }"
           }
          
        }
      ]
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103507775
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    +
    +      } else {
    +        throw new IllegalArgumentException(String.format("invalid type for triage value: profile=%s, entity=%s, key=%s, value=%s",
    --- End diff --
    
    Fix in latest commit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103496490
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    --- End diff --
    
    Hmm, maybe the period.start?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    @nickwallen Yep, I see what you mean.  I think we had different interpretations of "user focused."
    
    I think where I landed here on what I'd like can be broken down into a near-term, medium-term and a long-term vision for the profiler.
    
    **Near Term**
    In the near term, for this PR, we need the ability to:
    * Write from the profiler to kafka so we can triage the output of the profiler
    * Adjust the representations of the data the profiler writes based on the places it writes to
      * For HBase, any kryo serialized object
      * For Kafka, any fundamental structure (e.g. number, string) or Map of fundamental structures.
    * Be backwards compatible with the current syntax.
    
    Strictly speaking, I'll adopt your approach here of separating representation by its destination, where that destination is restricted to the possible destinations inside of Metron's current architecture.  So called *destination-focused*, rather than separating representation by its storage mechanism, where those storage mechanisms are restricted to the possible mechanisms that we support in Metron.  So called *writer-focused*
    
    In the following examples, every tick, the following happens:
    * 1 message is written to HBase with the stats function
    * 1 message is written to Kafka with a message that looks like this:
    ```
    {
      'profile' : 'test',
      'entity' : 'global'
      'mean' : ####,
      'stddev' : ####
    }
    ```
    
    This looks like: 
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "result":  {
             "profile" : "stats",
             "triage" : "{ 'mean' : STATS_MEAN(stats), 'stddev' : STATS_SD(stats) }"
           }     
        }
      ]
    }
    ```
    
    **Medium Term**
    
    This gets expanded to allow for multiple elements written per profile.
    
    In the following examples, every tick, the following happens:
    * 2 message is written to HBase for profile `test`
      * entity: `global:stats`
      * entity: `global:count`
    * 2 message is written to Kafka with a message that looks like this:
    ```
    {
      'profile' : 'test',
      'entity' : 'global'
      'result_type' : 'baseline_stats'
      'mean' : ####,
      'stddev' : ####
    }
    ```
    and
    ```
    {
      'profile' : 'test',
      'entity' : 'global'
      'result_type' : 'kurtosis'
      'kurtosis' : ####
    }
    ```
    
    This looks like:
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "result":  {
             "profile" : {
                          "stats" :  "stats",
                          "count" : "STATS_COUNT(stats)"
             "triage" : {
                          "baseline_stats" : "{ 'mean' : STATS_MEAN(stats), 'stddev' : STATS_SD(stats) }",
                          "kurtosis" : "STATS_KURTOSIS(stats)"
                           }
                       }   
        }
      ]
    }
    ```
    
    
    **Longer Term**
    
    This is where, in my mind, the writer-focused morphs into 'writer configuration' focused, which is to say, not just the transport, but also the destination.  In this world, we can directly associate the representation of the things we're writing from the profiler with the destination.  Our point of configuration for new writers in Metron is the `MessageWriter` and `BulkMessageWriter` interfaces.  We recently pulled out the configs into their own indexing configs, keyed by writer (kafka, elasticsearch, etc).  Imagine that the writers are configured entirely there and that it's not writer-oriented, but use-case oriented.  Instead of what we have now in the indexing config, we can make it:
    ```
    {
      "writers" : {
         "kafka" : {
          "batchSize" : 1,
          "enabled" : true
         },
       "hbase_profile" : {
          "batchSize" : 5,
          "enabled" : true
       }
       },
      "endpoints" : {
         "triage" : {
             "writer" : "kafka",
             "queue" : "enrichments"
                         },
          "profile" : {
             "writer" : "hbase_profile",
             "table" : "profile:P"
          }
      }
    }
    ```
    
    here, the two forms merge into one because we can represent using our core abstractions the capability-driven design that you are focused on, @nickwallen .  In this world, the profiler is simple, it just writes messages out to the indexing topology.  The structure looks of the tuple looks like:
    * message
    * endpoint
    
    The indexing topology will use the source type to pull the config and, since the endpoint is specified in the tuple, it will use the endpoint to write the message to the appropriately configured destination.
    
    In this world, the example in the medium term does not change:
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "result":  {
             "profile" : {
                          "stats" :  "stats",
                          "count" : "STATS_COUNT(stats)"
             "triage" : {
                          "baseline_stats" : "{ 'mean' : STATS_MEAN(stats), 'stddev' : STATS_SD(stats) }",
                          "kurtosis" : "STATS_KURTOSIS(stats)"
                           }
                       }   
        }
      ]
    }
    ```
    `profile` and `triage` are interpreted to mean endpoint names, who can be looked up in the indexing configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    I think what I mean is a little different (but maybe I've missed your point.)  
    
    For example, when @james-sirota first reviewed this PR he was confused why we would send data to Kafka.  He thought it was a replacement for HBase, rather than an addition to that.  His mistake was totally understandable.  Using terms like 'kafka' and 'hbase', forces the user to under why they would want to send profile data to HBase and why they would want to send profile data to Kafka.  It forces the user to know the implementation.
    
    But I am saying that users should not need to know the implementation details of the Profiler.  They should just tell us if they want the profile data stored for later and whether they want to triage the data from the Profiler.
    
    So I am suggesting that to be "user focused" we use terms that focus on the functionality from the user's perspective, not terms based on how we've implemented the Profiler.  A user would tell us to 'triage' or not; they would not tell us 'kafka' or not.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103496145
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    --- End diff --
    
    @cestella said...
    
    >     {
    >       "profile": "test",
    >       "foreach": "'global'",
    >       "onlyif": "source.type == 'bro'",
    >       "init":    { "count": "0" },
    >       "update":  { "count": "count + 1" },
    >       "result": {
    >           "profile": "count",
    >           "triage": "{ 'blah' : count, 'zork' : 'zork'}"
    >       }
    > 
    > Will I get messages in kafka that look like:
    > 
    > ```
    > {
    > "period.start":1488233820000
    > ,"period":24803897
    > ,"profile":"test"
    > ,"blah":161
    > ,"zork":"zork"
    > ,"period.end":1488233880000
    > ,"is_alert":"true"
    > ,"entity":"global"
    > ,"timestamp":1488233841600
    > ,"source.type":"profiler"
    > }
    > ```
    
    Yes, that is exactly what you would get.  The keys 'blah' and 'zork' would be added to the message with the result of evaluating the associated triage expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    > Am I missing something? Is there a way to define the topic dynamically while using the BulkMessageWriterBolt & KafkaMessageWriter classes unchanged?
    
    Created [METRON-738](https://issues.apache.org/jira/browse/METRON-738) to track this 'wish list' enhancement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    @nickwallen I agree with you about exposing implementation details via our API. I think it better to name things according to feature/function, not the underlying implementation. 
    
    @cestella I like the summary you provide for short/medium/long term solutions to this problem. I especially like the idea of being able to customize the writers used for the various endpoints.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Ok, good.  I don't see any problems there.  Will use the 'enrichments' queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Also, I will point out, this sets us up architecturally in the future to pull writer configs from zookeeper and support a series of other writers for the output of the profiler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    @nickwallen Hmm, how about the following, with `profile` and `triage` here being entirely user specifiable to break up the various ways you write:
    
    ```
    {
      "profiles": [
        {
          "profile": "test",
          "foreach": "'global'",
          "onlyif": "source.type == 'squid'",
          "update":  { "stats": "STATS_ADD(stats, LENGTH(url))" },
          "result":  {
             "profile" : {
                             "output" : "stats",
                             "if" : "STATS_COUNT(stats) > 0)",
                             "writer" : "hbase"
                             },
             "triage" : {
                           "output" : "{ 'mean' : STATS_MEAN(stats), 'stddev' : STATS_SD(stats) }",
                           "if" : "STATS_COUNT(stats) > 0 && STATS_SD(stats) > 10",
                           "writer" : "kafka"
                            }
           }     
        }
      ]
    }
    ``` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    I think this is ready for final review.  Come one, come all.  Would love to get this closed out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #449: METRON-701 Triage Metrics Produced by the Profi...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on the issue:

    https://github.com/apache/incubator-metron/pull/449
  
    Outside the scope of your "multiple result" idea that I need to think more on...
    
    The one thing I did not like about both approaches is the terminology.  Kind of silly, but important for usability.  
    
    What I mean is the use of the terms 'hbase' and 'kafka'.  I think it doesn't make clear to the user why they would want to choose one over the other.  I would really like to find non-implementation-specific terms that describe the function of each better.
    
    Random thoughts...
    * 'kafka' -> 'triage' or ??
    * 'hbase' -> 'store' or 'profile' or ??


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #449: METRON-701 Triage Metrics Produced by th...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/449#discussion_r103496563
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java ---
    @@ -0,0 +1,106 @@
    +/*
    + *  Licensed to the Apache Software Foundation (ASF) under one
    + *  or more contributor license agreements.  See the NOTICE file
    + *  distributed with this work for additional information
    + *  regarding copyright ownership.  The ASF licenses this file
    + *  to you under the Apache License, Version 2.0 (the
    + *  "License"); you may not use this file except in compliance
    + *  with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + *  Unless required by applicable law or agreed to in writing, software
    + *  distributed under the License is distributed on an "AS IS" BASIS,
    + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + *  See the License for the specific language governing permissions and
    + *  limitations under the License.
    + *
    + */
    +
    +package org.apache.metron.profiler.bolt;
    +
    +import org.apache.commons.lang3.ClassUtils;
    +import org.apache.metron.profiler.ProfileMeasurement;
    +import org.apache.storm.task.OutputCollector;
    +import org.apache.storm.topology.OutputFieldsDeclarer;
    +import org.apache.storm.tuple.Fields;
    +import org.apache.storm.tuple.Values;
    +import org.json.simple.JSONObject;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Handles emitting a ProfileMeasurement to the stream which writes
    + * profile measurements to Kafka.
    + */
    +public class KafkaDestinationHandler implements DestinationHandler, Serializable {
    +
    +  /**
    +   * The stream identifier used for this destination;
    +   */
    +  private String streamId = "kafka";
    +
    +  /**
    +   * The 'source.type' of messages originating from the Profiler.
    +   */
    +  private String sourceType = "profiler";
    +
    +  @Override
    +  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +    // the kafka writer expects a field named 'message'
    +    declarer.declareStream(getStreamId(), new Fields("message"));
    +  }
    +
    +  @Override
    +  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
    +
    +    JSONObject message = new JSONObject();
    +    message.put("profile", measurement.getDefinition().getProfile());
    +    message.put("entity", measurement.getEntity());
    +    message.put("period", measurement.getPeriod().getPeriod());
    +    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
    +    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
    +    message.put("timestamp", System.currentTimeMillis());
    +    message.put("source.type", sourceType);
    +    message.put("is_alert", "true");
    +
    +    // append each of the triage values to the message
    +    measurement.getTriageValues().forEach((key, value) -> {
    +
    +      if(isValidType(value)) {
    +        message.put(key, value);
    +
    +      } else {
    +        throw new IllegalArgumentException(String.format("invalid type for triage value: profile=%s, entity=%s, key=%s, value=%s",
    --- End diff --
    
    It currently doesn't kill the topology.  It would just impact the flush of any other profiles that happen to be handled by the same bolt instance.  But yes, needs fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---