You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/12/07 19:25:17 UTC

metron git commit: METRON-1810 Storm Profiler Intermittent Test Failure (nickwallen) closes apache/metron#1289

Repository: metron
Updated Branches:
  refs/heads/master e81a5c102 -> d749961da


METRON-1810 Storm Profiler Intermittent Test Failure (nickwallen) closes apache/metron#1289


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d749961d
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d749961d
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d749961d

Branch: refs/heads/master
Commit: d749961daaa77624e1cb6878548479d86c8fe668
Parents: e81a5c1
Author: nickwallen <ni...@nickallen.org>
Authored: Fri Dec 7 14:24:48 2018 -0500
Committer: nickallen <ni...@apache.org>
Committed: Fri Dec 7 14:24:48 2018 -0500

----------------------------------------------------------------------
 .../profiler/DefaultMessageDistributor.java     |  75 +++++++-------
 .../profiler/DefaultMessageDistributorTest.java |  12 +--
 metron-analytics/metron-profiler-storm/pom.xml  |  42 ++++++--
 .../integration/ProfilerIntegrationTest.java    | 103 ++++++++-----------
 .../metron/integration/utils/TestUtils.java     |   6 +-
 5 files changed, 121 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index 0e50467..ef2ff2c 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -21,11 +21,11 @@
 package org.apache.metron.profiler;
 
 import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalCause;
 import com.github.benmanes.caffeine.cache.RemovalListener;
 import com.github.benmanes.caffeine.cache.Ticker;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.stellar.dsl.Context;
@@ -41,7 +41,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
@@ -105,7 +104,7 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
           long periodDurationMillis,
           long profileTimeToLiveMillis,
           long maxNumberOfRoutes) {
-    this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker(), Optional.empty());
+    this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker());
   }
 
   /**
@@ -116,14 +115,12 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
    * @param maxNumberOfRoutes The max number of unique routes to maintain.  After this is exceeded, lesser
    *                          used routes will be evicted from the internal cache.
    * @param ticker The ticker used to drive time for the caches.  Only needs set for testing.
-   * @param cacheMaintenanceExecutor The executor responsible for running cache maintenance tasks. Only needed for testing.
    */
   public DefaultMessageDistributor(
           long periodDurationMillis,
           long profileTimeToLiveMillis,
           long maxNumberOfRoutes,
-          Ticker ticker,
-          Optional<Executor> cacheMaintenanceExecutor) {
+          Ticker ticker) {
 
     if(profileTimeToLiveMillis < periodDurationMillis) {
       throw new IllegalStateException(format(
@@ -138,11 +135,8 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
             .newBuilder()
             .maximumSize(maxNumberOfRoutes)
             .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
-            .removalListener(new ActiveCacheRemovalListener())
-            .ticker(ticker);
-    if (cacheMaintenanceExecutor.isPresent()) {
-      activeCacheBuilder.executor(cacheMaintenanceExecutor.get());
-    }
+            .ticker(ticker)
+            .writer(new ActiveCacheWriter());
     if (LOG.isDebugEnabled()) {
       activeCacheBuilder.recordStats();
     }
@@ -153,11 +147,8 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
             .newBuilder()
             .maximumSize(maxNumberOfRoutes)
             .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
-            .removalListener(new ExpiredCacheRemovalListener())
-            .ticker(ticker);
-    if (cacheMaintenanceExecutor.isPresent()) {
-      expiredCacheBuilder.executor(cacheMaintenanceExecutor.get());
-    }
+            .ticker(ticker)
+            .writer(new ExpiredCacheWriter());
     if (LOG.isDebugEnabled()) {
       expiredCacheBuilder.recordStats();
     }
@@ -238,10 +229,12 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
    */
   private void cacheMaintenance() {
     activeCache.cleanUp();
+    LOG.debug("Active cache maintenance triggered: cacheStats={}, size={}",
+            activeCache.stats().toString(), activeCache.estimatedSize());
+
     expiredCache.cleanUp();
-    LOG.debug("Cache maintenance triggered: activeCacheStats={}, expiredCacheStats={}",
-            activeCache.stats().toString(),
-            expiredCache.stats().toString());
+    LOG.debug("Expired cache maintenance triggered: cacheStats={}, size={}",
+            expiredCache.stats().toString(), expiredCache.estimatedSize());
   }
 
   /**
@@ -315,41 +308,51 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
   }
 
   /**
-   * A listener that is notified when profiles expire from the active cache.
+   * Notified synchronously when the active cache is modified.
    */
-  private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder>, Serializable {
+  private class ActiveCacheWriter implements CacheWriter<Integer, ProfileBuilder>, Serializable {
+
+    @Override
+    public void write(@Nonnull Integer key, @Nonnull ProfileBuilder value) {
+      // do nothing
+    }
 
     @Override
-    public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) {
-      LOG.warn("Profile expired from active cache; profile={}, entity={}",
-              expired.getDefinition().getProfile(),
-              expired.getEntity());
+    public void delete(@Nonnull Integer key, @Nullable ProfileBuilder value, @Nonnull RemovalCause cause) {
+      if(cause.wasEvicted()) {
+        // add the profile to the expired cache
+        expiredCache.put(key, value);
+        LOG.debug("Profile expired from active cache due to inactivity; profile={}, entity={}, cause={}",
+                value.getDefinition().getProfile(), value.getEntity(), cause);
 
-      // add the profile to the expired cache
-      expiredCache.put(key, expired);
+      } else {
+        LOG.error("Profile removed from cache unexpectedly. File a bug report; profile={}, entity={}, cause={}",
+                value.getDefinition().getProfile(), value.getEntity(), cause);
+      }
     }
   }
 
   /**
-   * A listener that is notified when profiles expire from the active cache.
+   * Notified synchronously when the expired cache is modified.
    */
-  private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder>, Serializable {
+  private class ExpiredCacheWriter implements CacheWriter<Integer, ProfileBuilder>, Serializable {
+
+    @Override
+    public void write(@Nonnull Integer key, @Nonnull ProfileBuilder value) {
+      // nothing to do
+    }
 
     @Override
-    public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) {
+    public void delete(@Nonnull Integer key, @Nullable ProfileBuilder value, @Nonnull RemovalCause cause) {
       if(cause.wasEvicted()) {
         // the expired profile was NOT flushed in time
         LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}, cause={}",
-                expired.getDefinition().getProfile(),
-                expired.getEntity(),
-                cause);
+                value.getDefinition().getProfile(), value.getEntity(), cause);
 
       } else {
         // the expired profile was flushed successfully
         LOG.debug("Expired profile successfully flushed; profile={}, entity={}, cause={}",
-                expired.getDefinition().getProfile(),
-                expired.getEntity(),
-                cause);
+                value.getDefinition().getProfile(), value.getEntity(), cause);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index d1b7598..e04404c 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -105,8 +105,7 @@ public class DefaultMessageDistributorTest {
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            Ticker.systemTicker(),
-            Optional.of(MoreExecutors.sameThreadExecutor()));
+            Ticker.systemTicker());
   }
 
   /**
@@ -199,8 +198,7 @@ public class DefaultMessageDistributorTest {
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            ticker,
-            Optional.of(MoreExecutors.sameThreadExecutor()));
+            ticker);
 
     // distribute one message
     distributor.distribute(route, context);
@@ -230,8 +228,7 @@ public class DefaultMessageDistributorTest {
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            ticker,
-            Optional.of(MoreExecutors.sameThreadExecutor()));
+            ticker);
 
     // distribute one message
     distributor.distribute(route, context);
@@ -262,8 +259,7 @@ public class DefaultMessageDistributorTest {
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            ticker,
-            Optional.of(MoreExecutors.sameThreadExecutor()));
+            ticker);
 
     // distribute one message
     distributor.distribute(route, context);

http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-storm/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index cfa30b3..ec2e215 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -36,6 +36,10 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -48,8 +52,8 @@
                     <groupId>javax.servlet</groupId>
                 </exclusion>
                 <exclusion>
-                    <artifactId>commons-httpclient</artifactId>
-                    <groupId>commons-httpclient</groupId>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.slf4j</groupId>
@@ -143,6 +147,26 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-integration-test</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.hamcrest</groupId>
+                    <artifactId>hamcrest-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo-shaded</artifactId>
             <version>${global_kryo_version}</version>
@@ -271,15 +295,15 @@
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
             <version>${global_mockito_version}</version>
             <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.tempus-fugit</groupId>
-            <artifactId>tempus-fugit</artifactId>
-            <version>1.2-20140129.191141-5</version>
-            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.hamcrest</groupId>
+                    <artifactId>hamcrest-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>

http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index f7e75ce..ea4ad4e 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -61,9 +61,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.code.tempusfugit.temporal.Duration.seconds;
-import static com.google.code.tempusfugit.temporal.Timeout.timeout;
-import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
@@ -77,7 +75,10 @@ import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_ID_FIELD;
 import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_START_FIELD;
 import static org.apache.metron.profiler.storm.KafkaEmitter.PROFILE_FIELD;
 import static org.apache.metron.profiler.storm.KafkaEmitter.TIMESTAMP_FIELD;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -88,6 +89,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler-storm/src/test";
   private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml";
+  private static final long timeout = TimeUnit.SECONDS.toMillis(90);
 
   public static final long startAt = 10;
   public static final String entity = "10.0.0.1";
@@ -205,32 +207,11 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     Thread.sleep(sleep);
     kafkaComponent.writeMessages(inputTopic, message3);
 
-    // retrieve the profile measurement using PROFILE_GET
-    String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))";
-    List<Integer> measurements = execute(profileGetExpression, List.class);
-
-    // need to keep checking for measurements until the profiler has flushed one out
-    int attempt = 0;
-    while(measurements.size() == 0 && attempt++ < 10) {
-
-      // wait for the profiler to flush
-      sleep = windowDurationMillis;
-      LOG.debug("Waiting {} millis for profiler to flush", sleep);
-      Thread.sleep(sleep);
-
-      // do not write additional messages to advance time. this ensures that we are testing the "time to live"
-      // flush mechanism. the TTL setting defines when the profile will be flushed
-
-      // try again to retrieve the profile measurement
-      measurements = execute(profileGetExpression, List.class);
-    }
-
-    // expect to see only 1 measurement, but could be more (one for each period) depending on
-    // how long we waited for the flush to occur
-    assertTrue(measurements.size() > 0);
-
     // the profile should have counted 3 messages; the 3 test messages that were sent
-    assertEquals(3, measurements.get(0).intValue());
+    assertEventually(() -> {
+      List<Integer> results = execute("PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))", List.class);
+      assertThat(results, hasItem(3));
+    }, timeout);
   }
 
   /**
@@ -278,25 +259,24 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     // wait until the profile flushes both periods.  the first period will flush immediately as subsequent messages
     // advance time.  the next period contains all of the remaining messages, so there are no other messages to
     // advance time.  because of this the next period only flushes after the time-to-live expires
-    waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, timeout(seconds(90)));
-    {
-      // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1
-      List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class);
-      assertEquals(14, results.get(0));
-      assertEquals(12, results.get(1));
-    }
-    {
-      // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158
-      List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class);
-      assertEquals(36, results.get(0));
-      assertEquals(38, results.get(1));
-    }
-    {
-      // in all there are 50 messages in the first period and 50 messages in the next
-      List results = execute("PROFILE_GET('total-count', 'total', window)", List.class);
-      assertEquals(50, results.get(0));
-      assertEquals(50, results.get(1));
-    }
+
+    // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1
+    assertEventually(() -> {
+      List<Integer> results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class);
+      assertThat(results, hasItems(14, 12));
+      }, timeout);
+
+    // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158
+    assertEventually(() -> {
+      List<Integer> results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class);
+      assertThat(results, hasItems(36, 38));
+      }, timeout);
+
+    // in all there are 50 (36+14) messages in the first period and 50 (38+12) messages in the next
+    assertEventually(() -> {
+      List<Integer> results = execute("PROFILE_GET('total-count', 'total', window)", List.class);
+      assertThat(results, hasItems(50, 50));
+      }, timeout);
   }
 
   /**
@@ -332,18 +312,18 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
     kafkaComponent.writeMessages(inputTopic, messages);
 
-    // wait until the profile is flushed
-    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90)));
+    assertEventually(() -> {
+      // validate the measurements written by the batch profiler using `PROFILE_GET`
+      // the 'window' looks up to 5 hours before the max timestamp contained in the test data
+      assign("maxTimestamp", "1530978728982L");
+      assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
 
-    // validate the measurements written by the batch profiler using `PROFILE_GET`
-    // the 'window' looks up to 5 hours before the max timestamp contained in the test data
-    assign("maxTimestamp", "1530978728982L");
-    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+      // retrieve the stats stored by the profiler
+      List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class);
+      assertTrue(results.size() > 0);
+      assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
 
-    // retrieve the stats stored by the profiler
-    List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class);
-    assertTrue(results.size() > 0);
-    assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
+    }, timeout);
   }
 
   /**
@@ -371,6 +351,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   @Multiline
   private static String profileWithTriageResult;
 
+  private List<byte[]> outputMessages;
   @Test
   public void testProfileWithTriageResult() throws Exception {
     uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult));
@@ -381,10 +362,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     kafkaComponent.writeMessages(inputTopic, telemetry);
 
     // wait until the triage message is output to kafka
-    waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90)));
-
-    List<byte[]> outputMessages = kafkaComponent.readMessages(outputTopic);
-    assertEquals(1, outputMessages.size());
+    assertEventually(() -> {
+      outputMessages = kafkaComponent.readMessages(outputTopic);
+      assertEquals(1, outputMessages.size());
+    }, timeout);
 
     // validate the triage message
     JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8"));

http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
index 0c37a35..026c9fb 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
@@ -38,12 +38,12 @@ public class TestUtils {
   public interface Assertion {
     void apply() throws Exception;
   }
+
   public static void assertEventually(Assertion assertion) throws Exception {
     assertEventually(assertion, MAX_ASSERT_WAIT_MS);
   }
-  private static void assertEventually(Assertion assertion
-                             , long msToWait
-                             ) throws Exception {
+
+  public static void assertEventually(Assertion assertion, long msToWait) throws Exception {
     long delta = msToWait/10;
     for(int i = 0;i < 10;++i) {
       try{