You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/12/07 19:25:17 UTC
metron git commit: METRON-1810 Storm Profiler Intermittent Test
Failure (nickwallen) closes apache/metron#1289
Repository: metron
Updated Branches:
refs/heads/master e81a5c102 -> d749961da
METRON-1810 Storm Profiler Intermittent Test Failure (nickwallen) closes apache/metron#1289
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d749961d
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d749961d
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d749961d
Branch: refs/heads/master
Commit: d749961daaa77624e1cb6878548479d86c8fe668
Parents: e81a5c1
Author: nickwallen <ni...@nickallen.org>
Authored: Fri Dec 7 14:24:48 2018 -0500
Committer: nickallen <ni...@apache.org>
Committed: Fri Dec 7 14:24:48 2018 -0500
----------------------------------------------------------------------
.../profiler/DefaultMessageDistributor.java | 75 +++++++-------
.../profiler/DefaultMessageDistributorTest.java | 12 +--
metron-analytics/metron-profiler-storm/pom.xml | 42 ++++++--
.../integration/ProfilerIntegrationTest.java | 103 ++++++++-----------
.../metron/integration/utils/TestUtils.java | 6 +-
5 files changed, 121 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index 0e50467..ef2ff2c 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -21,11 +21,11 @@
package org.apache.metron.profiler;
import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.CacheWriter;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Ticker;
-import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.stellar.dsl.Context;
@@ -41,7 +41,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -105,7 +104,7 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
long periodDurationMillis,
long profileTimeToLiveMillis,
long maxNumberOfRoutes) {
- this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker(), Optional.empty());
+ this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker());
}
/**
@@ -116,14 +115,12 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
* @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser
* used routes will be evicted from the internal cache.
* @param ticker The ticker used to drive time for the caches. Only needs set for testing.
- * @param cacheMaintenanceExecutor The executor responsible for running cache maintenance tasks. Only needed for testing.
*/
public DefaultMessageDistributor(
long periodDurationMillis,
long profileTimeToLiveMillis,
long maxNumberOfRoutes,
- Ticker ticker,
- Optional<Executor> cacheMaintenanceExecutor) {
+ Ticker ticker) {
if(profileTimeToLiveMillis < periodDurationMillis) {
throw new IllegalStateException(format(
@@ -138,11 +135,8 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
.newBuilder()
.maximumSize(maxNumberOfRoutes)
.expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
- .removalListener(new ActiveCacheRemovalListener())
- .ticker(ticker);
- if (cacheMaintenanceExecutor.isPresent()) {
- activeCacheBuilder.executor(cacheMaintenanceExecutor.get());
- }
+ .ticker(ticker)
+ .writer(new ActiveCacheWriter());
if (LOG.isDebugEnabled()) {
activeCacheBuilder.recordStats();
}
@@ -153,11 +147,8 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
.newBuilder()
.maximumSize(maxNumberOfRoutes)
.expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
- .removalListener(new ExpiredCacheRemovalListener())
- .ticker(ticker);
- if (cacheMaintenanceExecutor.isPresent()) {
- expiredCacheBuilder.executor(cacheMaintenanceExecutor.get());
- }
+ .ticker(ticker)
+ .writer(new ExpiredCacheWriter());
if (LOG.isDebugEnabled()) {
expiredCacheBuilder.recordStats();
}
@@ -238,10 +229,12 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
*/
private void cacheMaintenance() {
activeCache.cleanUp();
+ LOG.debug("Active cache maintenance triggered: cacheStats={}, size={}",
+ activeCache.stats().toString(), activeCache.estimatedSize());
+
expiredCache.cleanUp();
- LOG.debug("Cache maintenance triggered: activeCacheStats={}, expiredCacheStats={}",
- activeCache.stats().toString(),
- expiredCache.stats().toString());
+ LOG.debug("Expired cache maintenance triggered: cacheStats={}, size={}",
+ expiredCache.stats().toString(), expiredCache.estimatedSize());
}
/**
@@ -315,41 +308,51 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
}
/**
- * A listener that is notified when profiles expire from the active cache.
+ * Notified synchronously when the active cache is modified.
*/
- private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder>, Serializable {
+ private class ActiveCacheWriter implements CacheWriter<Integer, ProfileBuilder>, Serializable {
+
+ @Override
+ public void write(@Nonnull Integer key, @Nonnull ProfileBuilder value) {
+ // do nothing
+ }
@Override
- public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) {
- LOG.warn("Profile expired from active cache; profile={}, entity={}",
- expired.getDefinition().getProfile(),
- expired.getEntity());
+ public void delete(@Nonnull Integer key, @Nullable ProfileBuilder value, @Nonnull RemovalCause cause) {
+ if(cause.wasEvicted()) {
+ // add the profile to the expired cache
+ expiredCache.put(key, value);
+ LOG.debug("Profile expired from active cache due to inactivity; profile={}, entity={}, cause={}",
+ value.getDefinition().getProfile(), value.getEntity(), cause);
- // add the profile to the expired cache
- expiredCache.put(key, expired);
+ } else {
+ LOG.error("Profile removed from cache unexpectedly. File a bug report; profile={}, entity={}, cause={}",
+ value.getDefinition().getProfile(), value.getEntity(), cause);
+ }
}
}
/**
- * A listener that is notified when profiles expire from the active cache.
+ * Notified synchronously when the expired cache is modified.
*/
- private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder>, Serializable {
+ private class ExpiredCacheWriter implements CacheWriter<Integer, ProfileBuilder>, Serializable {
+
+ @Override
+ public void write(@Nonnull Integer key, @Nonnull ProfileBuilder value) {
+ // nothing to do
+ }
@Override
- public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) {
+ public void delete(@Nonnull Integer key, @Nullable ProfileBuilder value, @Nonnull RemovalCause cause) {
if(cause.wasEvicted()) {
// the expired profile was NOT flushed in time
LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}, cause={}",
- expired.getDefinition().getProfile(),
- expired.getEntity(),
- cause);
+ value.getDefinition().getProfile(), value.getEntity(), cause);
} else {
// the expired profile was flushed successfully
LOG.debug("Expired profile successfully flushed; profile={}, entity={}, cause={}",
- expired.getDefinition().getProfile(),
- expired.getEntity(),
- cause);
+ value.getDefinition().getProfile(), value.getEntity(), cause);
}
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index d1b7598..e04404c 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -105,8 +105,7 @@ public class DefaultMessageDistributorTest {
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
- Ticker.systemTicker(),
- Optional.of(MoreExecutors.sameThreadExecutor()));
+ Ticker.systemTicker());
}
/**
@@ -199,8 +198,7 @@ public class DefaultMessageDistributorTest {
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
- ticker,
- Optional.of(MoreExecutors.sameThreadExecutor()));
+ ticker);
// distribute one message
distributor.distribute(route, context);
@@ -230,8 +228,7 @@ public class DefaultMessageDistributorTest {
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
- ticker,
- Optional.of(MoreExecutors.sameThreadExecutor()));
+ ticker);
// distribute one message
distributor.distribute(route, context);
@@ -262,8 +259,7 @@ public class DefaultMessageDistributorTest {
periodDurationMillis,
profileTimeToLiveMillis,
maxNumberOfRoutes,
- ticker,
- Optional.of(MoreExecutors.sameThreadExecutor()));
+ ticker);
// distribute one message
distributor.distribute(route, context);
http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-storm/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index cfa30b3..ec2e215 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -36,6 +36,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -48,8 +52,8 @@
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
- <artifactId>commons-httpclient</artifactId>
- <groupId>commons-httpclient</groupId>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
@@ -143,6 +147,26 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-integration-test</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>${global_kryo_version}</version>
@@ -271,15 +295,15 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
<version>${global_mockito_version}</version>
<scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.google.code.tempus-fugit</groupId>
- <artifactId>tempus-fugit</artifactId>
- <version>1.2-20140129.191141-5</version>
- <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index f7e75ce..ea4ad4e 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -61,9 +61,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import static com.google.code.tempusfugit.temporal.Duration.seconds;
-import static com.google.code.tempusfugit.temporal.Timeout.timeout;
-import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
+import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
@@ -77,7 +75,10 @@ import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_ID_FIELD;
import static org.apache.metron.profiler.storm.KafkaEmitter.PERIOD_START_FIELD;
import static org.apache.metron.profiler.storm.KafkaEmitter.PROFILE_FIELD;
import static org.apache.metron.profiler.storm.KafkaEmitter.TIMESTAMP_FIELD;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -88,6 +89,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String TEST_RESOURCES = "../../metron-analytics/metron-profiler-storm/src/test";
private static final String FLUX_PATH = "src/main/flux/profiler/remote.yaml";
+ private static final long timeout = TimeUnit.SECONDS.toMillis(90);
public static final long startAt = 10;
public static final String entity = "10.0.0.1";
@@ -205,32 +207,11 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
Thread.sleep(sleep);
kafkaComponent.writeMessages(inputTopic, message3);
- // retrieve the profile measurement using PROFILE_GET
- String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))";
- List<Integer> measurements = execute(profileGetExpression, List.class);
-
- // need to keep checking for measurements until the profiler has flushed one out
- int attempt = 0;
- while(measurements.size() == 0 && attempt++ < 10) {
-
- // wait for the profiler to flush
- sleep = windowDurationMillis;
- LOG.debug("Waiting {} millis for profiler to flush", sleep);
- Thread.sleep(sleep);
-
- // do not write additional messages to advance time. this ensures that we are testing the "time to live"
- // flush mechanism. the TTL setting defines when the profile will be flushed
-
- // try again to retrieve the profile measurement
- measurements = execute(profileGetExpression, List.class);
- }
-
- // expect to see only 1 measurement, but could be more (one for each period) depending on
- // how long we waited for the flush to occur
- assertTrue(measurements.size() > 0);
-
// the profile should have counted 3 messages; the 3 test messages that were sent
- assertEquals(3, measurements.get(0).intValue());
+ assertEventually(() -> {
+ List<Integer> results = execute("PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))", List.class);
+ assertThat(results, hasItem(3));
+ }, timeout);
}
/**
@@ -278,25 +259,24 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
// wait until the profile flushes both periods. the first period will flush immediately as subsequent messages
// advance time. the next period contains all of the remaining messages, so there are no other messages to
// advance time. because of this the next period only flushes after the time-to-live expires
- waitOrTimeout(() -> profilerTable.getPutLog().size() >= 6, timeout(seconds(90)));
- {
- // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1
- List results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class);
- assertEquals(14, results.get(0));
- assertEquals(12, results.get(1));
- }
- {
- // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158
- List results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class);
- assertEquals(36, results.get(0));
- assertEquals(38, results.get(1));
- }
- {
- // in all there are 50 messages in the first period and 50 messages in the next
- List results = execute("PROFILE_GET('total-count', 'total', window)", List.class);
- assertEquals(50, results.get(0));
- assertEquals(50, results.get(1));
- }
+
+ // there are 14 messages in the first period and 12 in the next where ip_src_addr = 192.168.66.1
+ assertEventually(() -> {
+ List<Integer> results = execute("PROFILE_GET('count-by-ip', '192.168.66.1', window)", List.class);
+ assertThat(results, hasItems(14, 12));
+ }, timeout);
+
+ // there are 36 messages in the first period and 38 in the next where ip_src_addr = 192.168.138.158
+ assertEventually(() -> {
+ List<Integer> results = execute("PROFILE_GET('count-by-ip', '192.168.138.158', window)", List.class);
+ assertThat(results, hasItems(36, 38));
+ }, timeout);
+
+ // in all there are 50 (36+14) messages in the first period and 50 (38+12) messages in the next
+ assertEventually(() -> {
+ List<Integer> results = execute("PROFILE_GET('total-count', 'total', window)", List.class);
+ assertThat(results, hasItems(50, 50));
+ }, timeout);
}
/**
@@ -332,18 +312,18 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
List<String> messages = FileUtils.readLines(new File("src/test/resources/telemetry.json"));
kafkaComponent.writeMessages(inputTopic, messages);
- // wait until the profile is flushed
- waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90)));
+ assertEventually(() -> {
+ // validate the measurements written by the batch profiler using `PROFILE_GET`
+ // the 'window' looks up to 5 hours before the max timestamp contained in the test data
+ assign("maxTimestamp", "1530978728982L");
+ assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
- // validate the measurements written by the batch profiler using `PROFILE_GET`
- // the 'window' looks up to 5 hours before the max timestamp contained in the test data
- assign("maxTimestamp", "1530978728982L");
- assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+ // retrieve the stats stored by the profiler
+ List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class);
+ assertTrue(results.size() > 0);
+ assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
- // retrieve the stats stored by the profiler
- List results = execute("PROFILE_GET('profile-with-stats', 'global', window)", List.class);
- assertTrue(results.size() > 0);
- assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
+ }, timeout);
}
/**
@@ -371,6 +351,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Multiline
private static String profileWithTriageResult;
+ private List<byte[]> outputMessages;
@Test
public void testProfileWithTriageResult() throws Exception {
uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult));
@@ -381,10 +362,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
kafkaComponent.writeMessages(inputTopic, telemetry);
// wait until the triage message is output to kafka
- waitOrTimeout(() -> kafkaComponent.readMessages(outputTopic).size() > 0, timeout(seconds(90)));
-
- List<byte[]> outputMessages = kafkaComponent.readMessages(outputTopic);
- assertEquals(1, outputMessages.size());
+ assertEventually(() -> {
+ outputMessages = kafkaComponent.readMessages(outputTopic);
+ assertEquals(1, outputMessages.size());
+ }, timeout);
// validate the triage message
JSONObject message = (JSONObject) new JSONParser().parse(new String(outputMessages.get(0), "UTF-8"));
http://git-wip-us.apache.org/repos/asf/metron/blob/d749961d/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
index 0c37a35..026c9fb 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java
@@ -38,12 +38,12 @@ public class TestUtils {
public interface Assertion {
void apply() throws Exception;
}
+
public static void assertEventually(Assertion assertion) throws Exception {
assertEventually(assertion, MAX_ASSERT_WAIT_MS);
}
- private static void assertEventually(Assertion assertion
- , long msToWait
- ) throws Exception {
+
+ public static void assertEventually(Assertion assertion, long msToWait) throws Exception {
long delta = msToWait/10;
for(int i = 0;i < 10;++i) {
try{