You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:51 UTC
[21/52] [abbrv] metron git commit: METRON-590 Enable Use of Event
Time in Profiler (nickwallen) closes apache/metron#965
METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3083b471
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3083b471
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3083b471
Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 3083b471fe912bc74d27017834e6c80ff177680e
Parents: 46ad9d9
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Mar 20 16:00:20 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Mar 20 16:00:20 2018 -0400
----------------------------------------------------------------------
.../client/stellar/ProfilerFunctions.java | 14 +-
.../profiler/DefaultMessageDistributor.java | 207 +++++++-
.../metron/profiler/DefaultProfileBuilder.java | 110 ++--
.../metron/profiler/MessageDistributor.java | 48 +-
.../apache/metron/profiler/MessageRoute.java | 19 +-
.../apache/metron/profiler/MessageRouter.java | 11 +-
.../apache/metron/profiler/ProfileBuilder.java | 34 +-
.../metron/profiler/ProfileMeasurement.java | 6 +-
.../metron/profiler/StandAloneProfiler.java | 100 +++-
.../org/apache/metron/profiler/clock/Clock.java | 18 +-
.../metron/profiler/clock/ClockFactory.java | 38 ++
.../profiler/clock/DefaultClockFactory.java | 57 ++
.../metron/profiler/clock/EventTimeClock.java | 72 +++
.../metron/profiler/clock/FixedClock.java | 39 +-
.../profiler/clock/FixedClockFactory.java | 44 ++
.../apache/metron/profiler/clock/WallClock.java | 17 +-
.../profiler/DefaultMessageDistributorTest.java | 171 +++++-
.../profiler/DefaultProfileBuilderTest.java | 119 +++--
.../metron/profiler/ProfilePeriodTest.java | 1 -
.../metron/profiler/StandAloneProfilerTest.java | 255 +++++++++
.../profiler/clock/DefaultClockFactoryTest.java | 75 +++
.../profiler/clock/EventTimeClockTest.java | 115 +++++
.../metron/profiler/clock/WallClockTest.java | 54 ++
metron-analytics/metron-profiler/README.md | 98 +++-
.../src/main/config/profiler.properties | 14 +-
.../src/main/flux/profiler/remote.yaml | 42 +-
.../profiler/bolt/DestinationHandler.java | 56 --
.../bolt/FixedFrequencyFlushSignal.java | 126 +++++
.../metron/profiler/bolt/FlushSignal.java | 51 ++
.../profiler/bolt/HBaseDestinationHandler.java | 58 ---
.../metron/profiler/bolt/HBaseEmitter.java | 63 +++
.../profiler/bolt/KafkaDestinationHandler.java | 110 ----
.../metron/profiler/bolt/KafkaEmitter.java | 114 ++++
.../metron/profiler/bolt/ManualFlushSignal.java | 54 ++
.../profiler/bolt/ProfileBuilderBolt.java | 374 +++++++++++---
.../bolt/ProfileMeasurementEmitter.java | 59 +++
.../profiler/bolt/ProfileSplitterBolt.java | 132 ++++-
.../zookeeper/event-time-test/profiler.json | 12 +
.../bolt/FixedFrequencyFlushSignalTest.java | 71 +++
.../bolt/KafkaDestinationHandlerTest.java | 203 --------
.../metron/profiler/bolt/KafkaEmitterTest.java | 208 ++++++++
.../profiler/bolt/ProfileBuilderBoltTest.java | 516 +++++++++++--------
.../profiler/bolt/ProfileHBaseMapperTest.java | 6 +-
.../profiler/bolt/ProfileSplitterBoltTest.java | 288 +++++++++--
.../profiler/integration/MessageBuilder.java | 75 +++
.../integration/ProfilerIntegrationTest.java | 235 ++++++---
.../configuration/metron-profiler-env.xml | 77 ++-
.../package/scripts/params/params_linux.py | 7 +
.../package/templates/profiler.properties.j2 | 15 +-
.../METRON/CURRENT/themes/metron_theme.json | 118 ++++-
.../configuration/profiler/ProfileConfig.java | 53 ++
.../configuration/profiler/ProfilerConfig.java | 48 +-
.../apache/metron/common/utils/JSONUtils.java | 11 +-
.../configurations/ProfilerUpdater.java | 1 +
.../profiler/ProfileConfigTest.java | 5 +-
.../profiler/ProfilerConfigTest.java | 120 +++++
.../integration/components/KafkaComponent.java | 39 +-
57 files changed, 3987 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
index 64c1e2e..d6afe1d 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
@@ -101,7 +101,10 @@ public class ProfilerFunctions {
throw new IllegalArgumentException("Invalid profiler configuration", e);
}
- return new StandAloneProfiler(profilerConfig, periodDurationMillis, context);
+ // the TTL and max routes do not matter here
+ long profileTimeToLiveMillis = Long.MAX_VALUE;
+ long maxNumberOfRoutes = Long.MAX_VALUE;
+ return new StandAloneProfiler(profilerConfig, periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, context);
}
}
@@ -138,13 +141,8 @@ public class ProfilerFunctions {
// user must provide the stand alone profiler
StandAloneProfiler profiler = Util.getArg(1, StandAloneProfiler.class, args);
- try {
- for (JSONObject message : messages) {
- profiler.apply(message);
- }
-
- } catch (ExecutionException e) {
- throw new IllegalArgumentException(format("Failed to apply message; error=%s", e.getMessage()), e);
+ for (JSONObject message : messages) {
+ profiler.apply(message);
}
return profiler;
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index 53377a0..ea5126f 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -20,14 +20,20 @@
package org.apache.metron.profiler;
+import com.google.common.base.Ticker;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.clock.WallClock;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -36,32 +42,81 @@ import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
/**
- * Distributes a message along a MessageRoute. A MessageRoute will lead to one or
- * more ProfileBuilders.
+ * The default implementation of a {@link MessageDistributor}.
+ *
+ * <p>Two caches are maintained; one for active profiles and another for expired
+ * profiles. A profile will remain on the active cache as long as it continues
+ * to receive messages.
+ *
+ * <p>If a profile has not received messages for an extended period of time, it
+ * is expired and moved to the expired cache. A profile that is expired can no
+ * longer receive new messages.
+ *
+ * <p>A profile is stored in the expired cache for a fixed period of time so that
+ * a client can flush the state of expired profiles. If the client does not flush
+ * the expired profiles using `flushExpired`, the state of these profiles will be
+ * lost.
*
- * A ProfileBuilder is responsible for maintaining the state of a single profile,
- * for a single entity. There will be one ProfileBuilder for each (profile, entity) pair.
- * This class ensures that each ProfileBuilder receives the telemetry messages that
- * it needs.
*/
public class DefaultMessageDistributor implements MessageDistributor {
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
/**
* The duration of each profile period in milliseconds.
*/
private long periodDurationMillis;
/**
- * Maintains the state of a profile which is unique to a profile/entity pair.
+ * A cache of active profiles.
+ *
+ * A profile will remain on the active cache as long as it continues to receive
+ * messages. Once it has not received messages for a period of time, it is
+ * moved to the expired cache.
+ */
+ private transient Cache<String, ProfileBuilder> activeCache;
+
+ /**
+ * A cache of expired profiles.
+ *
+ * When a profile expires from the active cache, it is moved here for a
+ * period of time. In the expired cache a profile can no longer receive
+ * new messages. A profile waits on the expired cache so that the client
+ * can flush the state of the expired profile. If the client does not flush
+ * the expired profiles, this state will be lost forever.
*/
- private transient Cache<String, ProfileBuilder> profileCache;
+ private transient Cache<String, ProfileBuilder> expiredCache;
/**
* Create a new message distributor.
+ *
+ * @param periodDurationMillis The period duration in milliseconds.
+ * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds.
+ * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser
+ * used routes will be evicted from the internal cache.
+ */
+ public DefaultMessageDistributor(
+ long periodDurationMillis,
+ long profileTimeToLiveMillis,
+ long maxNumberOfRoutes) {
+ this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker());
+ }
+
+ /**
+ * Create a new message distributor.
+ *
* @param periodDurationMillis The period duration in milliseconds.
- * @param profileTimeToLiveMillis The TTL of a profile in milliseconds.
+ * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds.
+ * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser
+ * used routes will be evicted from the internal cache.
+ * @param ticker The ticker used to drive time for the caches. Only needs set for testing.
*/
- public DefaultMessageDistributor(long periodDurationMillis, long profileTimeToLiveMillis) {
+ public DefaultMessageDistributor(
+ long periodDurationMillis,
+ long profileTimeToLiveMillis,
+ long maxNumberOfRoutes,
+ Ticker ticker) {
+
if(profileTimeToLiveMillis < periodDurationMillis) {
throw new IllegalStateException(format(
"invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
@@ -69,9 +124,23 @@ public class DefaultMessageDistributor implements MessageDistributor {
periodDurationMillis));
}
this.periodDurationMillis = periodDurationMillis;
- this.profileCache = CacheBuilder
+
+ // build the cache of active profiles
+ this.activeCache = CacheBuilder
.newBuilder()
+ .maximumSize(maxNumberOfRoutes)
.expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
+ .removalListener(new ActiveCacheRemovalListener())
+ .ticker(ticker)
+ .build();
+
+ // build the cache of expired profiles
+ this.expiredCache = CacheBuilder
+ .newBuilder()
+ .maximumSize(maxNumberOfRoutes)
+ .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
+ .removalListener(new ExpiredCacheRemovalListener())
+ .ticker(ticker)
.build();
}
@@ -79,57 +148,120 @@ public class DefaultMessageDistributor implements MessageDistributor {
* Distribute a message along a MessageRoute.
*
* @param message The message that needs distributed.
+ * @param timestamp The timestamp of the message.
* @param route The message route.
* @param context The Stellar execution context.
* @throws ExecutionException
*/
@Override
- public void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException {
- getBuilder(route, context).apply(message);
+ public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) {
+ try {
+ ProfileBuilder builder = getBuilder(route, context);
+ builder.apply(message, timestamp);
+
+ } catch(ExecutionException e) {
+ LOG.error("Unexpected error", e);
+ throw new RuntimeException(e);
+ }
}
/**
- * Flushes all profiles. Flushes all ProfileBuilders that this distributor is responsible for.
+ * Flush all active profiles.
+ *
+ * <p>A profile will remain active as long as it continues to receive messages. If a profile
+ * does not receive a message for an extended duration, it may be marked as expired.
+ *
+ * <p>Flushes all active {@link ProfileBuilder} objects that this distributor is responsible for.
*
- * @return The profile measurements; one for each (profile, entity) pair.
+ * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
*/
@Override
public List<ProfileMeasurement> flush() {
+
+ // cache maintenance needed here to ensure active profiles will expire
+ activeCache.cleanUp();
+ expiredCache.cleanUp();
+
+ List<ProfileMeasurement> measurements = flushCache(activeCache);
+ return measurements;
+ }
+
+ /**
+ * Flush all expired profiles.
+ *
+ * <p>Flushes all expired {@link ProfileBuilder}s that this distributor is responsible for.
+ *
+ * <p>If a profile has not received messages for an extended period of time, it will be marked as
+ * expired. When a profile is expired, it can no longer receive new messages. Expired profiles
+ * remain only to give the client a chance to flush them.
+ *
+ * <p>If the client does not flush the expired profiles periodically, any state maintained in the
+ * profile since the last flush may be lost.
+ *
+ * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
+ */
+ @Override
+ public List<ProfileMeasurement> flushExpired() {
+
+ // cache maintenance needed here to ensure active profiles will expire
+ activeCache.cleanUp();
+ expiredCache.cleanUp();
+
+ // flush all expired profiles
+ List<ProfileMeasurement> measurements = flushCache(expiredCache);
+
+ // once the expired profiles have been flushed, they are no longer needed
+ expiredCache.invalidateAll();
+
+ return measurements;
+ }
+
+ /**
+ * Flush all of the profiles maintained in a cache.
+ *
+ * @param cache The cache to flush.
+ * @return The measurements captured when flushing the profiles.
+ */
+ private List<ProfileMeasurement> flushCache(Cache<String, ProfileBuilder> cache) {
+
List<ProfileMeasurement> measurements = new ArrayList<>();
+ for(ProfileBuilder profileBuilder: cache.asMap().values()) {
- profileCache.asMap().forEach((key, profileBuilder) -> {
+ // only need to flush, if the profile has been initialized
if(profileBuilder.isInitialized()) {
+
+ // flush the profiler and save the measurement, if one exists
Optional<ProfileMeasurement> measurement = profileBuilder.flush();
- measurement.ifPresent(measurements::add);
+ measurement.ifPresent(m -> measurements.add(m));
}
- });
+ }
- profileCache.cleanUp();
return measurements;
}
/**
* Retrieves the cached ProfileBuilder that is used to build and maintain the Profile. If none exists,
* one will be created and returned.
+ *
* @param route The message route.
* @param context The Stellar execution context.
*/
public ProfileBuilder getBuilder(MessageRoute route, Context context) throws ExecutionException {
ProfileConfig profile = route.getProfileDefinition();
String entity = route.getEntity();
- return profileCache.get(
+ return activeCache.get(
cacheKey(profile, entity),
() -> new DefaultProfileBuilder.Builder()
.withDefinition(profile)
.withEntity(entity)
.withPeriodDurationMillis(periodDurationMillis)
.withContext(context)
- .withClock(new WallClock())
.build());
}
/**
- * Builds the key that is used to lookup the ProfileState within the cache.
+ * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
+ *
* @param profile The profile definition.
* @param entity The entity.
*/
@@ -145,4 +277,33 @@ public class DefaultMessageDistributor implements MessageDistributor {
public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units) {
return withPeriodDurationMillis(units.toMillis(duration));
}
+
+ /**
+ * A listener that is notified when profiles expire from the active cache.
+ */
+ private class ActiveCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+
+ @Override
+ public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+
+ String key = notification.getKey();
+ ProfileBuilder expired = notification.getValue();
+
+ LOG.warn("Profile expired from active cache; key={}", key);
+ expiredCache.put(key, expired);
+ }
+ }
+
+ /**
+ * A listener that is notified when profiles expire from the active cache.
+ */
+ private class ExpiredCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+
+ @Override
+ public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+
+ String key = notification.getKey();
+ LOG.debug("Profile removed from expired cache; key={}", key);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
index 2e34160..4b564c9 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
@@ -20,7 +20,18 @@
package org.apache.metron.profiler;
-import static java.lang.String.format;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
@@ -34,20 +45,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.commons.collections4.ListUtils;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.clock.Clock;
-import org.apache.metron.profiler.clock.WallClock;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
-import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
/**
* Responsible for building and maintaining a Profile.
@@ -94,16 +93,15 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
private long periodDurationMillis;
/**
- * A clock is used to tell time; imagine that.
+ * Tracks the latest timestamp for use when flushing the profile.
*/
- private Clock clock;
+ private long maxTimestamp;
/**
- * Use the ProfileBuilder.Builder to create a new ProfileBuilder.
+ * Private constructor. Use the {@link Builder} to create a new {@link ProfileBuilder).
*/
private DefaultProfileBuilder(ProfileConfig definition,
String entity,
- Clock clock,
long periodDurationMillis,
Context stellarContext) {
@@ -111,27 +109,37 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
this.definition = definition;
this.profileName = definition.getProfile();
this.entity = entity;
- this.clock = clock;
this.periodDurationMillis = periodDurationMillis;
this.executor = new DefaultStellarStatefulExecutor();
StellarFunctions.initialize(stellarContext);
this.executor.setContext(stellarContext);
+ this.maxTimestamp = 0;
}
/**
* Apply a message to the profile.
+ *
* @param message The message to apply.
+ * @param timestamp The timestamp of the message.
*/
@Override
- public void apply(JSONObject message) {
+ public void apply(JSONObject message, long timestamp) {
try {
if (!isInitialized()) {
+
+ // execute each 'init' expression
assign(definition.getInit(), message, "init");
isInitialized = true;
}
+ // execute each 'update' expression
assign(definition.getUpdate(), message, "update");
+ // keep track of the 'latest' timestamp seen for use when flushing the profile
+ if(timestamp > maxTimestamp) {
+ maxTimestamp = timestamp;
+ }
+
} catch(Throwable e) {
LOG.error(format("Unable to apply message to profile: %s", e.getMessage()), e);
}
@@ -140,23 +148,30 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
/**
* Flush the Profile.
*
- * Completes and emits the ProfileMeasurement. Clears all state in preparation for
+ * <p>Completes and emits the {@link ProfileMeasurement}. Clears all state in preparation for
* the next window period.
*
- * @return Returns the completed profile measurement.
+ * @return Returns the completed {@link ProfileMeasurement}.
*/
@Override
public Optional<ProfileMeasurement> flush() {
- LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
- Optional<ProfileMeasurement> result = Optional.empty();
- ProfilePeriod period = new ProfilePeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS);
+
+ Optional<ProfileMeasurement> result;
+ ProfilePeriod period = new ProfilePeriod(maxTimestamp, periodDurationMillis, TimeUnit.MILLISECONDS);
try {
- // execute the 'profile' expression(s)
- Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile");
+ // execute the 'profile' expression
+ String profileExpression = definition
+ .getResult()
+ .getProfileExpressions()
+ .getExpression();
+ Object profileValue = execute(profileExpression, "result/profile");
// execute the 'triage' expression(s)
- Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions()
+ Map<String, Object> triageValues = definition
+ .getResult()
+ .getTriageExpressions()
+ .getExpressions()
.entrySet()
.stream()
.collect(Collectors.toMap(
@@ -185,10 +200,21 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
.withDefinition(definition));
} catch(Throwable e) {
+
// if any of the Stellar expressions fail, a measurement should NOT be returned
LOG.error(format("Unable to flush profile: error=%s", e.getMessage()), e);
+ result = Optional.empty();
}
+ LOG.debug("Flushed profile: profile={}, entity={}, maxTime={}, period={}, start={}, end={}, duration={}",
+ profileName,
+ entity,
+ maxTimestamp,
+ period.getPeriod(),
+ period.getStartTimeMillis(),
+ period.getEndTimeMillis(),
+ period.getDurationMillis());
+
isInitialized = false;
return result;
}
@@ -214,6 +240,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
/**
* Executes an expression contained within the profile definition.
+ *
* @param expression The expression to execute.
* @param transientState Additional transient state provided to the expression.
* @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
@@ -232,6 +259,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
/**
* Executes an expression contained within the profile definition.
+ *
* @param expression The expression to execute.
* @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
* @return The result of executing the expression.
@@ -242,6 +270,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
/**
* Executes a set of expressions whose results need to be assigned to a variable.
+ *
* @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
* @param transientState Additional transient state provided to the expression.
* @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
@@ -254,6 +283,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
String expr = entry.getValue();
try {
+
// assign the result of the expression to the variable
executor.assign(var, expr, transientState);
@@ -274,6 +304,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
/**
* Executes the expressions contained within the profile definition.
+ *
* @param expressions A list of expressions to execute.
* @param transientState Additional transient state provided to the expressions.
* @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
@@ -284,6 +315,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
for(String expr: ListUtils.emptyIfNull(expressions)) {
try {
+
// execute an expression
Object result = executor.execute(expr, transientState, Object.class);
results.add(result);
@@ -305,15 +337,19 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
return results;
}
+ @Override
+ public String getEntity() {
+ return entity;
+ }
+
/**
- * A builder used to construct a new ProfileBuilder.
+ * A builder should be used to construct a new {@link ProfileBuilder} object.
*/
public static class Builder {
private ProfileConfig definition;
private String entity;
- private long periodDurationMillis;
- private Clock clock = new WallClock();
+ private Long periodDurationMillis;
private Context context;
public Builder withContext(Context context) {
@@ -321,11 +357,6 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
return this;
}
- public Builder withClock(Clock clock) {
- this.clock = clock;
- return this;
- }
-
/**
* @param definition The profiler definition.
*/
@@ -370,8 +401,11 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
if(StringUtils.isEmpty(entity)) {
throw new IllegalArgumentException(format("missing entity name; got '%s'", entity));
}
+ if(periodDurationMillis == null) {
+ throw new IllegalArgumentException("missing period duration");
+ }
- return new DefaultProfileBuilder(definition, entity, clock, periodDurationMillis, context);
+ return new DefaultProfileBuilder(definition, entity, periodDurationMillis, context);
}
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
index a60446f..ea5be0f 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
@@ -24,33 +24,57 @@ import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
import java.util.List;
-import java.util.concurrent.ExecutionException;
/**
- * Distributes a message along a MessageRoute. A MessageRoute will lead to one or
- * more ProfileBuilders.
+ * Distributes a telemetry message along a {@link MessageRoute}. A {@link MessageRoute} will lead to a
+ * {@link ProfileBuilder} that is responsible for building and maintaining a profile.
*
- * A ProfileBuilder is responsible for maintaining the state of a single profile,
- * for a single entity. There will be one ProfileBuilder for each (profile, entity) pair.
- * This class ensures that each ProfileBuilder receives the telemetry messages that
- * it needs.
+ * <p>A {@link ProfileBuilder} is responsible for maintaining the state of a single (profile, entity)
+ * pairing. There will be one {@link ProfileBuilder} for each (profile, entity) pair.
+ *
+ * <p>A {@link MessageDistributor} ensures that each {@link ProfileBuilder} receives the telemetry
+ * messages that it needs.
+ *
+ * @see MessageRoute
+ * @see ProfileMeasurement
*/
public interface MessageDistributor {
/**
- * Distribute a message along a MessageRoute.
+ * Distribute a message along a {@link MessageRoute}.
*
* @param message The message that needs distributed.
+ * @param timestamp The timestamp of the message.
* @param route The message route.
* @param context The Stellar execution context.
- * @throws ExecutionException
*/
- void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException;
+ void distribute(JSONObject message, long timestamp, MessageRoute route, Context context);
/**
- * Flushes all profiles. Flushes all ProfileBuilders that this distributor is responsible for.
+ * Flush all active profiles.
+ *
+ * <p>A profile will remain active as long as it continues to receive messages. If a profile
+ * does not receive a message for an extended duration, it may be marked as expired.
*
- * @return The profile measurements; one for each (profile, entity) pair.
+ * <p>Flushes all active {@link ProfileBuilder} objects that this distributor is responsible for.
+ *
+ * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
*/
List<ProfileMeasurement> flush();
+
+ /**
+ * Flush all expired profiles.
+ *
+ * <p>If a profile has not received messages for an extended period of time, it will be marked as
+ * expired. When a profile is expired, it can no longer receive new messages. Expired profiles
+ * remain only to give the client a chance to flush them.
+ *
+ * <p>If the client does not flush the expired profiles periodically, any state maintained in the
+ * profile since the last flush may be lost.
+ *
+ * <p>Flushes all expired {@link ProfileBuilder} objects that this distributor is responsible for.
+ *
+ * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
+ */
+ List<ProfileMeasurement> flushExpired();
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
index 1945671..7288f03 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -23,12 +23,15 @@ package org.apache.metron.profiler;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
/**
- * A MessageRoute defines the profile and entity that a telemetry message needs applied to. This
- * allows a message to be routed to the profile and entity that needs it.
+ * Defines the 'route' a message must take through the Profiler.
*
- * One telemetry message may need multiple routes. This is the case when a message is needed by
- * more than one profile. In this case, there will be multiple MessageRoute objects for a single
- * message.
+ * <p>A {@link MessageRoute} defines the profile and entity that a telemetry message needs applied to.
+ *
+ * <p>If a message is needed by multiple profiles, then multiple {@link MessageRoute} values
+ * will exist. If a message is not needed by any profiles, then no {@link MessageRoute} values
+ * will exist.
+ *
+ * @see MessageRouter
*/
public class MessageRoute {
@@ -42,6 +45,12 @@ public class MessageRoute {
*/
private String entity;
+ /**
+ * Create a {@link MessageRoute}.
+ *
+ * @param profileDefinition The profile definition.
+ * @param entity The entity.
+ */
public MessageRoute(ProfileConfig profileDefinition, String entity) {
this.entity = entity;
this.profileDefinition = profileDefinition;
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
index 99c98a3..4c18062 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
@@ -27,15 +27,18 @@ import org.json.simple.JSONObject;
import java.util.List;
/**
- * Routes incoming telemetry messages.
+ * Routes incoming telemetry messages through the Profiler.
*
- * A single telemetry message may need to take multiple routes. This is the case
- * when a message is needed by more than one profile.
+ * <p>If a message is needed by multiple profiles, then multiple {@link MessageRoute} values
+ * will be returned. If a message is not needed by any profiles, then no {@link MessageRoute} values
+ * will be returned.
+ *
+ * @see MessageRoute
*/
public interface MessageRouter {
/**
- * Route a telemetry message. Finds all routes for a given telemetry message.
+ * Finds all routes for a telemetry message.
*
* @param message The telemetry message that needs routed.
* @param config The configuration for the Profiler.
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
index c09b0b6..07372d7 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -28,47 +28,61 @@ import java.util.Optional;
/**
* Responsible for building and maintaining a Profile.
*
- * One or more messages are applied to the Profile with `apply` and a profile measurement is
- * produced by calling `flush`.
+ * <p>Telemetry messages are applied to a profile using {@link ProfileBuilder#apply(JSONObject, long)}. A
+ * {@link ProfileMeasurement} is generated by calling {@link ProfileBuilder#flush()}.
*
- * Any one instance is responsible only for building the profile for a specific [profile, entity]
- * pairing. There will exist many instances, one for each [profile, entity] pair that exists
+ * A {@link ProfileBuilder} is responsible only for building the profile for a specific [profile, entity]
+ * pair. There will exist many instances, one for each [profile, entity] pair that exists
* within the incoming telemetry data applied to the profile.
*/
public interface ProfileBuilder {
/**
* Apply a message to the profile.
+ *
* @param message The message to apply.
+ * @param timestamp The timestamp of the message.
*/
- void apply(JSONObject message);
+ void apply(JSONObject message, long timestamp);
/**
* Flush the Profile.
*
- * Completes and emits the ProfileMeasurement. Clears all state in preparation for
+ * <p>Completes the period and returns the {@link ProfileMeasurement}. Clears all state in preparation for
* the next window period.
*
- * @return Returns the completed profile measurement.
+ * @return Returns the {@link ProfileMeasurement}.
*/
Optional<ProfileMeasurement> flush();
/**
- * Has the ProfileBuilder been initialized?
+ * Has the {@link ProfileBuilder} been initialized?
+ *
* @return True, if initialization has occurred. False, otherwise.
*/
boolean isInitialized();
/**
* Returns the definition of the profile being built.
- * @return ProfileConfig definition of the profile
+ *
+ * @return The profile definition.
*/
ProfileConfig getDefinition();
/**
- * Returns the value of a variable being maintained by the builder.
+ * Returns the value of a variable within the current profile state.
+ *
* @param variable The variable name.
* @return The value of the variable.
*/
Object valueOf(String variable);
+
+ /**
+ * Returns the name of the entity.
+ *
+ * <p>Each {@code ProfileBuilder} instance is responsible for one (profile, entity) pair.
+ *
+ * @return The entity.
+ */
+ String getEntity();
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index 0e773e9..f6cc286 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -28,10 +28,10 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
- * Represents a single data point within a Profile.
+ * Represents a single data point within a profile.
*
- * A Profile is effectively a time series. To this end a Profile is composed
- * of many ProfileMeasurement values which in aggregate form a time series.
+ * <p>A profile contains many individual {@link ProfileMeasurement} values captured over a
+ * period of time. These values in aggregate form a time series.
*/
public class ProfileMeasurement {
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
index 6db7079..f79efe6 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
@@ -21,18 +21,29 @@
package org.apache.metron.profiler;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.DefaultClockFactory;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
/**
- * A stand alone version of the Profiler that does not require a
- * distributed execution environment like Apache Storm.
+ * A stand alone version of the Profiler that does not require a distributed
+ * execution environment like Apache Storm.
+ *
+ * <p>This class is used to create and manage profiles within the REPL environment.
*/
public class StandAloneProfiler {
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
/**
* The Stellar execution context.
*/
@@ -54,6 +65,11 @@ public class StandAloneProfiler {
private MessageDistributor distributor;
/**
+ * The factory that creates Clock objects.
+ */
+ private ClockFactory clockFactory;
+
+ /**
* Counts the number of messages that have been applied.
*/
private int messageCount;
@@ -67,12 +83,26 @@ public class StandAloneProfiler {
*/
private int routeCount;
- public StandAloneProfiler(ProfilerConfig config, long periodDurationMillis, Context context) {
+ /**
+ * Create a new Profiler.
+ *
+ * @param config The Profiler configuration.
+ * @param periodDurationMillis The period duration in milliseconds.
+ * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds.
+ * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser
+ * used routes will be evicted from the internal cache.
+ * @param context The Stellar execution context.
+ */
+ public StandAloneProfiler(ProfilerConfig config,
+ long periodDurationMillis,
+ long profileTimeToLiveMillis,
+ long maxNumberOfRoutes,
+ Context context) {
this.context = context;
this.config = config;
this.router = new DefaultMessageRouter(context);
- // the period TTL does not matter in this context
- this.distributor = new DefaultMessageDistributor(periodDurationMillis, Long.MAX_VALUE);
+ this.distributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes);
+ this.clockFactory = new DefaultClockFactory();
this.messageCount = 0;
this.routeCount = 0;
}
@@ -80,26 +110,28 @@ public class StandAloneProfiler {
/**
* Apply a message to a set of profiles.
* @param message The message to apply.
- * @throws ExecutionException
*/
- public void apply(JSONObject message) throws ExecutionException {
+ public void apply(JSONObject message) {
- List<MessageRoute> routes = router.route(message, config, context);
- for(MessageRoute route : routes) {
- distributor.distribute(message, route, context);
- }
+ // what time is it?
+ Clock clock = clockFactory.createClock(config);
+ Optional<Long> timestamp = clock.currentTimeMillis(message);
- routeCount += routes.size();
- messageCount += 1;
- }
+ // can only route the message, if we have a timestamp
+ if(timestamp.isPresent()) {
- @Override
- public String toString() {
- return "Profiler{" +
- getProfileCount() + " profile(s), " +
- getMessageCount() + " messages(s), " +
- getRouteCount() + " route(s)" +
- '}';
+ // route the message to the correct profile builders
+ List<MessageRoute> routes = router.route(message, config, context);
+ for (MessageRoute route : routes) {
+ distributor.distribute(message, timestamp.get(), route, context);
+ }
+
+ routeCount += routes.size();
+ messageCount += 1;
+
+ } else {
+ LOG.warn("No timestamp available for the message. The message will be ignored.");
+ }
}
/**
@@ -110,19 +142,45 @@ public class StandAloneProfiler {
return distributor.flush();
}
+ /**
+ * Returns the Profiler configuration.
+ * @return The Profiler configuration.
+ */
public ProfilerConfig getConfig() {
return config;
}
+ /**
+ * Returns the number of defined profiles.
+ * @return The number of defined profiles.
+ */
public int getProfileCount() {
return (config == null) ? 0: config.getProfiles().size();
}
+ /**
+ * Returns the number of messages that have been applied.
+ * @return The number of messages that have been applied.
+ */
public int getMessageCount() {
return messageCount;
}
+ /**
+ * Returns the number of routes.
+ * @return The number of routes.
+ * @see MessageRoute
+ */
public int getRouteCount() {
return routeCount;
}
+
+ @Override
+ public String toString() {
+ return "Profiler{" +
+ getProfileCount() + " profile(s), " +
+ getMessageCount() + " messages(s), " +
+ getRouteCount() + " route(s)" +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
index 6730e49..b07c0ed 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
@@ -20,16 +20,24 @@
package org.apache.metron.profiler.clock;
+import org.json.simple.JSONObject;
+
+import java.util.Optional;
+
/**
- * A clock can tell time; imagine that.
+ * A {@link Clock} manages the progression of time in the Profiler.
*
- * This allows the Profiler to support different treatments of time like wall clock versus event time.
+ * <p>The Profiler can operate on either processing time or event time. This
+ * abstraction deals with the differences between the two.
*/
public interface Clock {
/**
- * The current time in epoch milliseconds.
+ * Returns the current time in epoch milliseconds.
+ *
+ * @param message The telemetry message.
+ * @return An optional value containing the current time in epoch milliseconds, if
+ * the current time is known. Otherwise, empty.
*/
- long currentTimeMillis();
-
+ Optional<Long> currentTimeMillis(JSONObject message);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java
new file mode 100644
index 0000000..5435c48
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+/**
+ * A factory for creating {@link Clock} objects.
+ *
+ * The type of {@link Clock} needed will depend on the Profiler configuration.
+ */
+public interface ClockFactory {
+
+ /**
+ * Creates and returns a {@link Clock}.
+ *
+ * @param config The profiler configuration.
+ * @return A {@link Clock}.
+ */
+ Clock createClock(ProfilerConfig config);
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java
new file mode 100644
index 0000000..d62e62b
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+/**
+ * Creates a {@link Clock} based on the profiler configuration.
+ *
+ * <p>If the Profiler is configured to use event time, a {@link EventTimeClock} will
+ * be created. Otherwise, a {@link WallClock} will be created.
+ *
+ * <p>The default implementation of a {@link ClockFactory}.
+ */
+public class DefaultClockFactory implements ClockFactory {
+
+ /**
+ * @param config The profiler configuration.
+ * @return The appropriate Clock based on the profiler configuration.
+ */
+ @Override
+ public Clock createClock(ProfilerConfig config) {
+ Clock clock;
+
+ boolean isEventTime = config.getTimestampField().isPresent();
+ if(isEventTime) {
+
+ // using event time
+ String timestampField = config.getTimestampField().get();
+ clock = new EventTimeClock(timestampField);
+
+ } else {
+
+ // using processing time
+ clock = new WallClock();
+ }
+
+ return clock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java
new file mode 100644
index 0000000..5cd574e
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Optional;
+
+/**
+ * A {@link Clock} that advances based on event time.
+ *
+ * Event time is advanced by the timestamps contained within telemetry messages, rather
+ * than the system clock.
+ */
+public class EventTimeClock implements Clock {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * The name of the field from which the timestamp will
+ */
+ private String timestampField;
+
+ /**
+ * @param timestampField The name of the field containing a timestamp.
+ */
+ public EventTimeClock(String timestampField) {
+ this.timestampField = timestampField;
+ }
+
+ @Override
+ public Optional<Long> currentTimeMillis(JSONObject message) {
+
+ Long result;
+ if(message != null && message.containsKey(timestampField)) {
+
+ // extract the timestamp and convert to a long
+ Object timestamp = message.get(timestampField);
+ result = ConversionUtils.convert(timestamp, Long.class);
+
+ } else {
+
+ // the message does not contain the specified timestamp field
+ LOG.debug("message does not contain timestamp field '{}': message will be ignored: message='{}'",
+ timestampField, JSONObject.toJSONString(message));
+ result = null;
+ }
+
+ return Optional.ofNullable(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
index c6e93cd..8259ed0 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
@@ -20,21 +20,50 @@
package org.apache.metron.profiler.clock;
-import java.io.Serializable;
+import org.json.simple.JSONObject;
+
+import java.util.Optional;
/**
- * A clock that reports whatever time you tell it to. Most useful for testing.
+ * A {@link Clock} that always reports the same time.
+ *
+ * <p>This is only useful for testing.
*/
-public class FixedClock implements Clock, Serializable {
+public class FixedClock implements Clock {
+ /**
+ * The time in milliseconds since the epoch.
+ */
private long epochMillis;
+ /**
+ * Create a {@link Clock}. The time defaults to the epoch.
+ */
+ public FixedClock() {
+ this(0);
+ }
+
+ /**
+ * Create a {@link Clock}.
+ * @param epochMillis The time in milliseconds since the epoch.
+ */
+ public FixedClock(long epochMillis) {
+ this.setTime(epochMillis);
+ }
+
+ /**
+ * Set the current time.
+ * @param epochMillis The time in milliseconds since the epoch.
+ */
public void setTime(long epochMillis) {
this.epochMillis = epochMillis;
}
+ /**
+ * @return The time in milliseconds since the epoch.
+ */
@Override
- public long currentTimeMillis() {
- return this.epochMillis;
+ public Optional<Long> currentTimeMillis(JSONObject message) {
+ return Optional.of(this.epochMillis);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java
new file mode 100644
index 0000000..b0248cd
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+/**
+ * A {@link ClockFactory} that always returns a {@link FixedClock}.
+ *
+ * <p>A {@link FixedClock} always returns the same time and is only useful for testing.
+ */
+public class FixedClockFactory implements ClockFactory {
+
+ private long timestamp;
+
+ /**
+ * @param timestamp The timestamp that all {@link Clock} objects created by this factory will report.
+ */
+ public FixedClockFactory(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public Clock createClock(ProfilerConfig config) {
+ return new FixedClock(timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
index 1a20c94..20f62e3 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
@@ -20,15 +20,22 @@
package org.apache.metron.profiler.clock;
-import java.io.Serializable;
+import org.json.simple.JSONObject;
+
+import java.util.Optional;
/**
- * A clock that uses the system clock to provide wall clock time.
+ * A {@link Clock} that advances based on system time.
+ *
+ * <p>This {@link Clock} is used to advance time when the Profiler is running
+ * on processing time, rather than event time.
*/
-public class WallClock implements Clock, Serializable {
+public class WallClock implements Clock {
@Override
- public long currentTimeMillis() {
- return System.currentTimeMillis();
+ public Optional<Long> currentTimeMillis(JSONObject message) {
+
+ // the message does not matter; use system time
+ return Optional.of(System.currentTimeMillis());
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index ff4c289..ea9c5c6 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -20,6 +20,7 @@
package org.apache.metron.profiler;
+import com.google.common.base.Ticker;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.utils.JSONUtils;
@@ -33,6 +34,9 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.Assert.assertEquals;
public class DefaultMessageDistributorTest {
@@ -83,16 +87,22 @@ public class DefaultMessageDistributorTest {
private DefaultMessageDistributor distributor;
private Context context;
+ private long periodDurationMillis = MINUTES.toMillis(15);
+ private long profileTimeToLiveMillis = MINUTES.toMillis(30);
+ private long maxNumberOfRoutes = Long.MAX_VALUE;
@Before
public void setup() throws Exception {
+
context = Context.EMPTY_CONTEXT();
JSONParser parser = new JSONParser();
messageOne = (JSONObject) parser.parse(inputOne);
messageTwo = (JSONObject) parser.parse(inputTwo);
+
distributor = new DefaultMessageDistributor(
- TimeUnit.MINUTES.toMillis(15),
- TimeUnit.MINUTES.toMillis(30));
+ periodDurationMillis,
+ profileTimeToLiveMillis,
+ maxNumberOfRoutes);
}
/**
@@ -108,15 +118,18 @@ public class DefaultMessageDistributorTest {
*/
@Test
public void testDistribute() throws Exception {
+
+ // setup
+ long timestamp = 100;
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
MessageRoute route = new MessageRoute(definition, entity);
- // distribute one message
- distributor.distribute(messageOne, route, context);
+ // distribute one message and flush
+ distributor.distribute(messageOne, timestamp, route, context);
+ List<ProfileMeasurement> measurements = distributor.flush();
// expect one measurement coming from one profile
- List<ProfileMeasurement> measurements = distributor.flush();
assertEquals(1, measurements.size());
ProfileMeasurement m = measurements.get(0);
assertEquals(definition.getProfile(), m.getProfileName());
@@ -126,12 +139,17 @@ public class DefaultMessageDistributorTest {
@Test
public void testDistributeWithTwoProfiles() throws Exception {
- // distribute one message to the first profile
+ // setup
+ long timestamp = 100;
String entity = (String) messageOne.get("ip_src_addr");
- distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entity), context);
+
+ // distribute one message to the first profile
+ MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity);
+ distributor.distribute(messageOne, timestamp, routeOne, context);
// distribute another message to the second profile, but same entity
- distributor.distribute(messageOne, new MessageRoute(createDefinition(profileTwo), entity), context);
+ MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity);
+ distributor.distribute(messageOne, timestamp, routeTwo, context);
// expect 2 measurements; 1 for each profile
List<ProfileMeasurement> measurements = distributor.flush();
@@ -141,17 +159,150 @@ public class DefaultMessageDistributorTest {
@Test
public void testDistributeWithTwoEntities() throws Exception {
+ // setup
+ long timestamp = 100;
+
// distribute one message
String entityOne = (String) messageOne.get("ip_src_addr");
- distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entityOne), context);
+ MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne);
+ distributor.distribute(messageOne, timestamp, routeOne, context);
// distribute another message with a different entity
String entityTwo = (String) messageTwo.get("ip_src_addr");
- distributor.distribute(messageTwo, new MessageRoute(createDefinition(profileTwo), entityTwo), context);
+ MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo);
+ distributor.distribute(messageTwo, timestamp, routeTwo, context);
// expect 2 measurements; 1 for each entity
List<ProfileMeasurement> measurements = distributor.flush();
assertEquals(2, measurements.size());
}
+ /**
+ * A profile should expire after a fixed period of time. This test ensures that
+ * profiles are not expired before they are supposed to be.
+ */
+ @Test
+ public void testNotYetTimeToExpireProfiles() throws Exception {
+
+ // the ticker drives time to allow us to test cache expiration
+ FixedTicker ticker = new FixedTicker();
+
+ // setup
+ ProfileConfig definition = createDefinition(profileOne);
+ String entity = (String) messageOne.get("ip_src_addr");
+ MessageRoute route = new MessageRoute(definition, entity);
+ distributor = new DefaultMessageDistributor(
+ periodDurationMillis,
+ profileTimeToLiveMillis,
+ maxNumberOfRoutes,
+ ticker);
+
+ // distribute one message
+ distributor.distribute(messageOne, 1000000, route, context);
+
+ // advance time to just shy of the profile TTL
+ ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS);
+
+ // the profile should NOT have expired yet
+ assertEquals(0, distributor.flushExpired().size());
+ assertEquals(1, distributor.flush().size());
+ }
+
+ /**
+ * A profile should expire after a fixed period of time.
+ */
+ @Test
+ public void testProfilesShouldExpire() throws Exception {
+
+ // the ticker drives time to allow us to test cache expiration
+ FixedTicker ticker = new FixedTicker();
+
+ // setup
+ ProfileConfig definition = createDefinition(profileOne);
+ String entity = (String) messageOne.get("ip_src_addr");
+ MessageRoute route = new MessageRoute(definition, entity);
+ distributor = new DefaultMessageDistributor(
+ periodDurationMillis,
+ profileTimeToLiveMillis,
+ maxNumberOfRoutes,
+ ticker);
+
+ // distribute one message
+ distributor.distribute(messageOne, 100000, route, context);
+
+ // advance time to just beyond the period duration
+ ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS);
+
+ // the profile should have expired by now
+ assertEquals(1, distributor.flushExpired().size());
+ assertEquals(0, distributor.flush().size());
+ }
+
+ /**
+ * An expired profile is only kept around for a fixed period of time. It should be removed, if it
+ * has been on the expired cache for too long.
+ */
+ @Test
+ public void testExpiredProfilesShouldBeRemoved() throws Exception {
+
+ // the ticker drives time to allow us to test cache expiration
+ FixedTicker ticker = new FixedTicker();
+
+ // setup
+ ProfileConfig definition = createDefinition(profileOne);
+ String entity = (String) messageOne.get("ip_src_addr");
+ MessageRoute route = new MessageRoute(definition, entity);
+ distributor = new DefaultMessageDistributor(
+ periodDurationMillis,
+ profileTimeToLiveMillis,
+ maxNumberOfRoutes,
+ ticker);
+
+ // distribute one message
+ distributor.distribute(messageOne, 1000000, route, context);
+
+ // advance time a couple of hours
+ ticker.advanceTime(2, HOURS);
+
+ // the profile should have been expired
+ assertEquals(0, distributor.flush().size());
+
+ // advance time a couple of hours
+ ticker.advanceTime(2, HOURS);
+
+ // the profile should have been removed from the expired cache
+ assertEquals(0, distributor.flushExpired().size());
+ }
+
+ /**
+ * An implementation of Ticker that can be used to drive time
+ * when testing the Guava caches.
+ */
+ private class FixedTicker extends Ticker {
+
+ /**
+ * The time that will be reported.
+ */
+ private long timestampNanos;
+
+ public FixedTicker() {
+ this.timestampNanos = Ticker.systemTicker().read();
+ }
+
+ public FixedTicker startAt(long timestampNanos) {
+ this.timestampNanos = timestampNanos;
+ return this;
+ }
+
+ public FixedTicker advanceTime(long time, TimeUnit units) {
+ this.timestampNanos += units.toNanos(time);
+ return this;
+ }
+
+ @Override
+ public long read() {
+ return this.timestampNanos;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
index d25b7ff..24eb5f8 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
@@ -23,8 +23,6 @@ package org.apache.metron.profiler;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.clock.Clock;
-import org.apache.metron.profiler.clock.FixedClock;
import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -82,7 +80,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testInit() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -92,7 +92,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -106,7 +106,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testInitWithNoMessage() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -146,7 +148,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testUpdate() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -158,7 +162,12 @@ public class DefaultProfileBuilderTest {
// execute
int count = 10;
for(int i=0; i<count; i++) {
- builder.apply(message);
+
+ // apply the message
+ builder.apply(message, timestamp);
+
+ // advance time
+ timestamp += 5;
}
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -183,7 +192,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testResult() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -193,7 +204,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -206,40 +217,38 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testProfilePeriodOnFlush() throws Exception {
- // setup
- FixedClock clock = new FixedClock();
- clock.setTime(100);
+ // setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
.withEntity("10.0.0.1")
.withPeriodDuration(10, TimeUnit.MINUTES)
.withContext(Context.EMPTY_CONTEXT())
- .withClock(clock)
.build();
{
// apply a message and flush
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
// validate the profile period
- ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+ ProfilePeriod expected = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES);
assertEquals(expected, m.get().getPeriod());
}
{
- // advance time by at least one period - 10 minutes
- clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
+ // advance time by at least one period... about 10 minutes
+ timestamp += TimeUnit.MINUTES.toMillis(10);
// apply a message and flush again
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
// validate the profile period
- ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+ ProfilePeriod expected = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES);
assertEquals(expected, m.get().getPeriod());
}
}
@@ -262,7 +271,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testGroupBy() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -272,7 +283,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -300,23 +311,20 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testStateAvailableToGroupBy() throws Exception {
- FixedClock clock = new FixedClock();
- clock.setTime(1503081070340L);
- long periodDurationMillis = TimeUnit.MINUTES.toMillis(10);
- ProfilePeriod period = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
// setup
+ long timestamp = 1503081070340L;
+ ProfilePeriod period = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES);
definition = JSONUtils.INSTANCE.load(testStateAvailableToGroupBy, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
.withEntity("10.0.0.1")
.withPeriodDuration(10, TimeUnit.MINUTES)
.withContext(Context.EMPTY_CONTEXT())
- .withClock(clock)
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -350,7 +358,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testFlushDoesNotClearsState() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -362,16 +372,24 @@ public class DefaultProfileBuilderTest {
// execute - accumulate some state then flush it
int count = 10;
for(int i=0; i<count; i++) {
- builder.apply(message);
+
+ // apply the message
+ builder.apply(message, timestamp);
+
+ // advance time
+ timestamp += 5;
}
builder.flush();
+ // advance time beyond the current period
+ timestamp += TimeUnit.MINUTES.toMillis(20);
+
// apply another message to accumulate new state, then flush again to validate original state was cleared
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
- assertTrue(m.isPresent());
// validate
+ assertTrue(m.isPresent());
assertEquals(33, m.get().getProfileValue());
}
@@ -395,7 +413,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testFlushDoesNotClearsStateButInitDoes() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -407,18 +427,27 @@ public class DefaultProfileBuilderTest {
// execute - accumulate some state then flush it
int count = 10;
for(int i=0; i<count; i++) {
- builder.apply(message);
+
+ // apply a message
+ builder.apply(message, timestamp);
+
+ // advance time
+ timestamp += 5;
}
builder.flush();
+ // advance time beyond the current period
+ timestamp += TimeUnit.MINUTES.toMillis(20);
+
// apply another message to accumulate new state, then flush again to validate original state was cleared
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
// validate
assertEquals(3, m.get().getProfileValue());
}
+
/**
* {
* "profile": "test",
@@ -434,7 +463,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testEntity() throws Exception {
+
// setup
+ long timestamp = 100;
final String entity = "10.0.0.1";
definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
@@ -445,7 +476,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -473,7 +504,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testResultWithProfileExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -483,7 +516,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -515,7 +548,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testResultWithTriageExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -525,7 +560,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
Optional<ProfileMeasurement> m = builder.flush();
assertTrue(m.isPresent());
@@ -550,7 +585,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testBadInitExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badInitProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -560,7 +597,7 @@ public class DefaultProfileBuilderTest {
.build();
// due to the bad expression, there should be no result
- builder.apply(message);
+ builder.apply(message, timestamp);
assertFalse(builder.flush().isPresent());
}
@@ -579,7 +616,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testBadResultExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badSimpleResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -589,7 +628,7 @@ public class DefaultProfileBuilderTest {
.build();
// due to the bad expression, there should be no result
- builder.apply(message);
+ builder.apply(message, timestamp);
assertFalse(builder.flush().isPresent());
}
@@ -608,7 +647,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testBadGroupByExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badGroupByProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -618,7 +659,7 @@ public class DefaultProfileBuilderTest {
.build();
// due to the bad expression, there should be no result
- builder.apply(message);
+ builder.apply(message, timestamp);
assertFalse(builder.flush().isPresent());
}
@@ -641,7 +682,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testBadResultProfileExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badResultProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -651,7 +694,7 @@ public class DefaultProfileBuilderTest {
.build();
// due to the bad expression, there should be no result
- builder.apply(message);
+ builder.apply(message, timestamp);
assertFalse(builder.flush().isPresent());
}
@@ -674,7 +717,9 @@ public class DefaultProfileBuilderTest {
@Test
public void testBadResultTriageExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badResultTriage, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -684,7 +729,7 @@ public class DefaultProfileBuilderTest {
.build();
// due to the bad expression, there should be no result
- builder.apply(message);
+ builder.apply(message, timestamp);
assertFalse(builder.flush().isPresent());
}
@@ -707,7 +752,9 @@ public class DefaultProfileBuilderTest {
*/
@Test
public void testBadUpdateExpression() throws Exception {
+
// setup
+ long timestamp = 100;
definition = JSONUtils.INSTANCE.load(badUpdateProfile, ProfileConfig.class);
builder = new DefaultProfileBuilder.Builder()
.withDefinition(definition)
@@ -717,7 +764,7 @@ public class DefaultProfileBuilderTest {
.build();
// execute
- builder.apply(message);
+ builder.apply(message, timestamp);
// if the update expression fails, the profile should still flush.
Optional<ProfileMeasurement> m = builder.flush();
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
index 3a51ea4..1a72111 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -20,7 +20,6 @@
package org.apache.metron.profiler;
-import org.apache.metron.profiler.ProfilePeriod;
import org.junit.Test;
import java.util.concurrent.TimeUnit;