You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/04/02 17:34:42 UTC

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

GitHub user nickwallen opened a pull request:

    https://github.com/apache/metron/pull/977

    METRON-1505 Intermittent Profiler Integration Test Failure

    ### Problem
    
    The integration tests were failing intermittently when Storm unexpectedly expired messages generated by the integration tests.  When Storm expired these messages they were never received by the Profiler bolts, which caused the integration tests to fail.
    
    ### Root Cause
    
    Storm's event window mechanism was not configured correctly to use the timestamp extracted from the telemetry message.  Storm was instead defaulting to system time.  
    
    If the time when the downstream `ProfileBuilderBolt` processed a message differed significantly enough from when the upstream `ProfileSplitterBolt` processed the message, the message would be errantly expired by Storm.  
    
    This is why the problem could only be replicated when run in Travis, a resource constrained environment.  When run on any other environment, the system time when these two events occur will not differ enough for Storm to mistakenly expire the test messages.
    
    This did not necessarily matter for the core functioning of the Profiler, as the Profiler itself continued to use the correct event timestamps.  This bug only affected significantly out-of-order messages and the flushing of expired profiles for the integration tests.
    
    ### The Fix
    
    The simple fix was to ensure that Storm uses the correct event timestamp field.  Doing this highlighted another problem.  Storm does not work correctly when using tick tuples along with an event timestamp field.  Storm will attempt to extract an event timestamp from the tick tuple, which will not exist and cause the entire topology to fail.
    
    This meant that I could not use tick tuples.  To work around this, I created a separate thread that flushes the expired profiles regularly.  The separate thread introduces thread safety concerns, so I also needed to perform some locking.
    
    ### Changes
    
    Most of these changes were done in separate commits to making review easier.
    
    1. Added a separate thread to the `ProfileBuilderBolt` to flush expired profiles regularly.  This is the core fix to the integration test bug.
    
    2. Corrected the key generated to cache `ProfileBuilder` objects.  This previously relied on the underlying `ProfileConfig.toString` method which was error prone and slow.  It now uses the hash key.
    
    3. Reduced the number of Profiler integration tests.  There is now one integration test that tests event time processing and another that tests the same profile using processing time.
    
        Previously there were a number of different profiles that were tested.  This was necessary before as the integration tests were the only effective way to test different profile logic.  Since then, significant refactoring has occurred which allowed the same logic to be tested in unit tests rather than in integration tests.  
    
        This allowed me to clean-up these tests which reduces run time and complexity in the integration tests.
    
    4. Added some simple debug logging to `HBaseBolt`.
    
    ## Pull Request Checklist
    
    - [ ] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [ ] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    - [ ] Have you included steps to reproduce the behavior or problem that is being changed or addressed?
    - [ ] Have you included steps or a guide to how the change may be verified and tested manually?
    - [ ] Have you ensured that the full suite of tests and checks have been executed in the root metron folder
    - [ ] Have you written or updated unit tests and or integration tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nickwallen/metron METRON-1505

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/977.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #977
    
----
commit e9185e2249bfcca1c47d3809e05bf742a73ff80e
Author: Nick Allen <ni...@...>
Date:   2018-03-26T12:13:26Z

    Added debug logging to HbaseBolt

commit ab3e6ad2cefb236a3de8a7c3dec3c100761ffc57
Author: Nick Allen <ni...@...>
Date:   2018-03-26T15:34:13Z

    Corrected cache key usage and logging

commit 1cda9523fe65ce8ef9ec40305b873e6ed620cc78
Author: Nick Allen <ni...@...>
Date:   2018-03-23T22:48:38Z

    A separate timer thread is used to flush expired profiles.

----


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178600804
  
    --- Diff: metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java ---
    @@ -70,247 +66,103 @@
       private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler/src/test";
       private static final String FLUX_PATH = "../metron-profiler/src/main/flux/profiler/remote.yaml";
     
    -  /**
    -   * {
    -   * "ip_src_addr": "10.0.0.1",
    -   * "protocol": "HTTPS",
    -   * "length": 10,
    -   * "bytes_in": 234
    -   * }
    -   */
    -  @Multiline
    -  private static String message1;
    -
    -  /**
    -   * {
    -   * "ip_src_addr": "10.0.0.2",
    -   * "protocol": "HTTP",
    -   * "length": 20,
    -   * "bytes_in": 390
    -   * }
    -   */
    -  @Multiline
    -  private static String message2;
    -
    -  /**
    -   * {
    -   * "ip_src_addr": "10.0.0.3",
    -   * "protocol": "DNS",
    -   * "length": 30,
    -   * "bytes_in": 560
    -   * }
    -   */
    -  @Multiline
    -  private static String message3;
    -
    -  private static ColumnBuilder columnBuilder;
    -  private static ZKServerComponent zkComponent;
    -  private static FluxTopologyComponent fluxComponent;
    -  private static KafkaComponent kafkaComponent;
    -  private static ConfigUploadComponent configUploadComponent;
    -  private static ComponentRunner runner;
    -  private static MockHTable profilerTable;
    +  public static final long startAt = 10;
    +  public static final String entity = "10.0.0.1";
     
       private static final String tableName = "profiler";
       private static final String columnFamily = "P";
    -  private static final double epsilon = 0.001;
       private static final String inputTopic = Constants.INDEXING_TOPIC;
       private static final String outputTopic = "profiles";
       private static final int saltDivisor = 10;
     
    -  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5);
    +  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(1);
       private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5);
    -  private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15);
    -  private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
    +  private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(10);
    +  private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(15);
       private static final long maxRoutesPerBolt = 100000;
     
    -  /**
    -   * Tests the first example contained within the README.
    -   */
    -  @Test
    -  public void testExample1() throws Exception {
    -
    -    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
    -
    -    // start the topology and write test messages to kafka
    -    fluxComponent.submitTopology();
    -    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
    -    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
    -    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
    -
    -    // verify - ensure the profile is being persisted
    -    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
    -            timeout(seconds(180)));
    -
    -    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
    -    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
    -            columnBuilder.getColumnQualifier("value"), Double.class);
    -
    -    // verify - there are 3 'HTTP' each with 390 bytes
    -    Assert.assertTrue(actuals.stream().anyMatch(val ->
    -            MathUtils.equals(390.0 * 3, val, epsilon)
    -    ));
    -  }
    -
    -  /**
    -   * Tests the second example contained within the README.
    -   */
    -  @Test
    -  public void testExample2() throws Exception {
    -
    -    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
    -
    -    // start the topology and write test messages to kafka
    -    fluxComponent.submitTopology();
    -    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
    -    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
    -    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
    -
    -    // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
    -    final int expected = 2;
    -
    -    // verify - ensure the profile is being persisted
    -    waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected,
    -            timeout(seconds(90)));
    -
    -    // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
    -    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
    -            columnBuilder.getColumnQualifier("value"), Double.class);
    -
    -    // verify - 10.0.0.3 -> 1/4
    -    Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals),
    -            actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon)
    -    ));
    -
    -    // verify - 10.0.0.2 -> 4/1
    -    Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals),
    -            actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon)
    -    ));
    -  }
    -
    -  /**
    -   * Tests the third example contained within the README.
    -   */
    -  @Test
    -  public void testExample3() throws Exception {
    -
    -    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
    +  private static ColumnBuilder columnBuilder;
    +  private static ZKServerComponent zkComponent;
    +  private static FluxTopologyComponent fluxComponent;
    +  private static KafkaComponent kafkaComponent;
    +  private static ConfigUploadComponent configUploadComponent;
    +  private static ComponentRunner runner;
    +  private static MockHTable profilerTable;
     
    -    // start the topology and write test messages to kafka
    -    fluxComponent.submitTopology();
    -    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
    -    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
    -    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
    -
    -    // verify - ensure the profile is being persisted
    -    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
    -            timeout(seconds(90)));
    -
    -    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
    -    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
    -            columnBuilder.getColumnQualifier("value"), Double.class);
    -
    -    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
    -    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
    -            actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
    -    ));
    -  }
    +  private static String message1;
    +  private static String message2;
    +  private static String message3;
     
       /**
    -   * Tests the fourth example contained within the README.
    +   * The Profiler can generate profiles based on processing time.  With processing time,
    +   * the Profiler builds profiles based on when the telemetry was processed.
    +   *
    +   * <p>Not defining a 'timestampField' within the Profiler configuration tells the Profiler
    +   * to use processing time.
        */
       @Test
    -  public void testExample4() throws Exception {
    +  public void testProcessingTime() throws Exception {
    --- End diff --
    
    Integration tests now only have a `testProcessingTime` and `testEventTime`.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178600343
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---
    @@ -310,17 +313,37 @@ public void execute(TupleWindow window) {
       }
     
       /**
    -   * Flush all expired profiles when a 'tick' is received.
    +   * Flush all active profiles.
    +   */
    +  protected void flushActive() {
    +    activeFlushSignal.reset();
    +
    +    // flush the active profiles
    +    List<ProfileMeasurement> measurements;
    +    synchronized(messageDistributor) {
    +      measurements = messageDistributor.flush();
    +      emitMeasurements(measurements);
    +    }
    +
    +    LOG.debug("Flushed active profiles and found {} measurement(s).", measurements.size());
    +
    +  }
    +
    +  /**
    +   * Flushes all expired profiles.
        *
    -   * If a profile has not received a message for an extended period of time then it is
    +   * <p>If a profile has not received a message for an extended period of time then it is
        * marked as expired.  Periodically we need to flush these expired profiles to ensure
        * that their state is not lost.
        */
    -  private void handleTick() {
    +  protected void flushExpired() {
     
         // flush the expired profiles
    -    List<ProfileMeasurement> measurements = messageDistributor.flushExpired();
    -    emitMeasurements(measurements);
    +    List<ProfileMeasurement> measurements;
    +    synchronized (messageDistributor) {
    --- End diff --
    
    Access to the `messageDistributor` has to be synchronized now.  It is not thread safe and it could be called from either the timer thread or when tuples are received.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178599132
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---
    @@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, Context context) throws Exe
       /**
        * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
        *
    +   * <p>The cache key is built using the hash codes of the profile and entity name.  If the profile
    +   * definition is ever changed, the same cache entry will not be reused.  This ensures that no
    +   * state can be carried over from the old definition into the new, which might result in an
    +   * invalid profile measurement.
    +   *
        * @param profile The profile definition.
        * @param entity The entity.
        */
    -  private String cacheKey(ProfileConfig profile, String entity) {
    -    return format("%s:%s", profile, entity);
    +  private int cacheKey(ProfileConfig profile, String entity) {
    +    return new HashCodeBuilder(17, 37)
    --- End diff --
    
    The cache key needs to ensure that when the user changes a profile definition, even slightly, that a different `ProfileBuilder` is used.  Reusing the same `ProfileBuilder` would create inconsistent results.
    
    Instead of using `ProfileConfig.toString()` as part of the cache key, it now uses the hash code from the profile and the entity.  I think this is less error prone and more performant.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178599870
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---
    @@ -395,10 +420,46 @@ private void emitMeasurements(List<ProfileMeasurement> measurements) {
         return value;
       }
     
    +  /**
    +   * Converts milliseconds to seconds and handles an ugly cast.
    +   *
    +   * @param millis Duration in milliseconds.
    +   * @return Duration in seconds.
    +   */
    +  private int toSeconds(long millis) {
    +    return (int) TimeUnit.MILLISECONDS.toSeconds(millis);
    +  }
    +
    +  /**
    +   * Creates a timer that regularly flushes expired profiles on a separate thread.
    +   */
    +  private void startExpiredFlushTimer() {
    +
    +    expiredFlushTimer = createTimer("flush-expired-profiles-timer");
    +    expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired());
    +  }
    --- End diff --
    
    This is the timer thread that flushes expired profiles regularly.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r179871500
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---
    @@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, Context context) throws Exe
       /**
        * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
        *
    +   * <p>The cache key is built using the hash codes of the profile and entity name.  If the profile
    +   * definition is ever changed, the same cache entry will not be reused.  This ensures that no
    +   * state can be carried over from the old definition into the new, which might result in an
    +   * invalid profile measurement.
    +   *
        * @param profile The profile definition.
        * @param entity The entity.
        */
    -  private String cacheKey(ProfileConfig profile, String entity) {
    -    return format("%s:%s", profile, entity);
    +  private int cacheKey(ProfileConfig profile, String entity) {
    +    return new HashCodeBuilder(17, 37)
    --- End diff --
    
    Thanks for the explanation @nickwallen, makes sense.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/metron/pull/977


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r179860237
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---
    @@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, Context context) throws Exe
       /**
        * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
        *
    +   * <p>The cache key is built using the hash codes of the profile and entity name.  If the profile
    +   * definition is ever changed, the same cache entry will not be reused.  This ensures that no
    +   * state can be carried over from the old definition into the new, which might result in an
    +   * invalid profile measurement.
    +   *
        * @param profile The profile definition.
        * @param entity The entity.
        */
    -  private String cacheKey(ProfileConfig profile, String entity) {
    -    return format("%s:%s", profile, entity);
    +  private int cacheKey(ProfileConfig profile, String entity) {
    +    return new HashCodeBuilder(17, 37)
    --- End diff --
    
    I'm not as familiar with this functionality - How do we cut over/end an existing profile when a profile definition is changed? Is there any continuity in the calculations or is it an immediate start over from scratch?


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r179868541
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---
    @@ -262,11 +262,19 @@ public ProfileBuilder getBuilder(MessageRoute route, Context context) throws Exe
       /**
        * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
        *
    +   * <p>The cache key is built using the hash codes of the profile and entity name.  If the profile
    +   * definition is ever changed, the same cache entry will not be reused.  This ensures that no
    +   * state can be carried over from the old definition into the new, which might result in an
    +   * invalid profile measurement.
    +   *
        * @param profile The profile definition.
        * @param entity The entity.
        */
    -  private String cacheKey(ProfileConfig profile, String entity) {
    -    return format("%s:%s", profile, entity);
    +  private int cacheKey(ProfileConfig profile, String entity) {
    +    return new HashCodeBuilder(17, 37)
    --- End diff --
    
    That state is maintained in a `ProfileBuilder` stored in this cache.  If the profile definition changes, the cache key would change, which would force it to start using a different `ProfileBuilder` instance. 
    
    Say I had a v1.0 of the profile that has been running and now I make changes, so I'll call that version 2.0 of the profile.  We'd have a`ProfileBuilder` that handles v1.0 of the profile definition and another that handles v2.0 of the profile.
    
    The v1.0 instance will stop receiving messages because that profile definition no longer exists.  The TTL for the profile will lapse and the profile will be marked as "expired".  Then periodically this timer thread will trigger a flush of all expired profiles.  The state that was in v1.0 will then be flushed and stored.
    
    The v2.0 instance will start receiving messages and building its state.  This instance will remain "active" because it is receiving messages.  This active profile will flush when the period expires and its state will be stored.
    
    It is not safe to mix state when a profile definition is changed by a user.  You don't know how the profile was changed and whether the change was compatible or not. 


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178600386
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---
    @@ -339,11 +362,13 @@ private void handleMessage(Tuple input) {
         Long timestamp = getField(TIMESTAMP_TUPLE_FIELD, input, Long.class);
     
         // keep track of time
    -    flushSignal.update(timestamp);
    +    activeFlushSignal.update(timestamp);
         
         // distribute the message
         MessageRoute route = new MessageRoute(definition, entity);
    -    messageDistributor.distribute(message, timestamp, route, getStellarContext());
    +    synchronized (messageDistributor) {
    +      messageDistributor.distribute(message, timestamp, route, getStellarContext());
    +    }
    --- End diff --
    
    Access to the `messageDistributor` has to be synchronized now.  It is not thread safe and it could be called from either the timer thread or when tuples are received.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178600285
  
    --- Diff: metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java ---
    @@ -310,17 +313,37 @@ public void execute(TupleWindow window) {
       }
     
       /**
    -   * Flush all expired profiles when a 'tick' is received.
    +   * Flush all active profiles.
    +   */
    +  protected void flushActive() {
    +    activeFlushSignal.reset();
    +
    +    // flush the active profiles
    +    List<ProfileMeasurement> measurements;
    +    synchronized(messageDistributor) {
    --- End diff --
    
    Access to the `messageDistributor` has to be synchronized now.  It is not thread safe and it could be called from either the timer thread or when tuples are received.


---

[GitHub] metron pull request #977: METRON-1505 Intermittent Profiler Integration Test...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/977#discussion_r178599674
  
    --- Diff: metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---
    @@ -281,29 +289,45 @@ public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units
       /**
        * A listener that is notified when profiles expire from the active cache.
        */
    -  private class ActiveCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
    +  private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> {
     
         @Override
    -    public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
    +    public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) {
     
    -      String key = notification.getKey();
           ProfileBuilder expired = notification.getValue();
    +      LOG.warn("Profile expired from active cache; profile={}, entity={}",
    +              expired.getDefinition().getProfile(),
    +              expired.getEntity());
     
    -      LOG.warn("Profile expired from active cache; key={}", key);
    -      expiredCache.put(key, expired);
    +      // add the profile to the expired cache
    +      expiredCache.put(notification.getKey(), expired);
         }
       }
     
       /**
        * A listener that is notified when profiles expire from the active cache.
        */
    -  private class ExpiredCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
    +  private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder> {
     
         @Override
    -    public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
    +    public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) {
    +
    +      if(notification.wasEvicted()) {
    +
    +        // the expired profile was NOT flushed in time
    --- End diff --
    
    A profile being removed from the expired cache is only 'bad' when it is evicted.  When an eviction occurs, we get a WARN.  Otherwise, only a DEBUG is used.  This makes the logging much more useful when troubleshooting.


---

[GitHub] metron issue #977: METRON-1505 Intermittent Profiler Integration Test Failur...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on the issue:

    https://github.com/apache/metron/pull/977
  
    Can't see anything wrong with the logic here @nickwallen. +1 by inspection. Thanks for the fix.


---