You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/04/27 19:29:53 UTC

[13/50] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965

METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3083b471
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3083b471
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3083b471

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 3083b471fe912bc74d27017834e6c80ff177680e
Parents: 46ad9d9
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Mar 20 16:00:20 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Tue Mar 20 16:00:20 2018 -0400

----------------------------------------------------------------------
 .../client/stellar/ProfilerFunctions.java       |  14 +-
 .../profiler/DefaultMessageDistributor.java     | 207 +++++++-
 .../metron/profiler/DefaultProfileBuilder.java  | 110 ++--
 .../metron/profiler/MessageDistributor.java     |  48 +-
 .../apache/metron/profiler/MessageRoute.java    |  19 +-
 .../apache/metron/profiler/MessageRouter.java   |  11 +-
 .../apache/metron/profiler/ProfileBuilder.java  |  34 +-
 .../metron/profiler/ProfileMeasurement.java     |   6 +-
 .../metron/profiler/StandAloneProfiler.java     | 100 +++-
 .../org/apache/metron/profiler/clock/Clock.java |  18 +-
 .../metron/profiler/clock/ClockFactory.java     |  38 ++
 .../profiler/clock/DefaultClockFactory.java     |  57 ++
 .../metron/profiler/clock/EventTimeClock.java   |  72 +++
 .../metron/profiler/clock/FixedClock.java       |  39 +-
 .../profiler/clock/FixedClockFactory.java       |  44 ++
 .../apache/metron/profiler/clock/WallClock.java |  17 +-
 .../profiler/DefaultMessageDistributorTest.java | 171 +++++-
 .../profiler/DefaultProfileBuilderTest.java     | 119 +++--
 .../metron/profiler/ProfilePeriodTest.java      |   1 -
 .../metron/profiler/StandAloneProfilerTest.java | 255 +++++++++
 .../profiler/clock/DefaultClockFactoryTest.java |  75 +++
 .../profiler/clock/EventTimeClockTest.java      | 115 +++++
 .../metron/profiler/clock/WallClockTest.java    |  54 ++
 metron-analytics/metron-profiler/README.md      |  98 +++-
 .../src/main/config/profiler.properties         |  14 +-
 .../src/main/flux/profiler/remote.yaml          |  42 +-
 .../profiler/bolt/DestinationHandler.java       |  56 --
 .../bolt/FixedFrequencyFlushSignal.java         | 126 +++++
 .../metron/profiler/bolt/FlushSignal.java       |  51 ++
 .../profiler/bolt/HBaseDestinationHandler.java  |  58 ---
 .../metron/profiler/bolt/HBaseEmitter.java      |  63 +++
 .../profiler/bolt/KafkaDestinationHandler.java  | 110 ----
 .../metron/profiler/bolt/KafkaEmitter.java      | 114 ++++
 .../metron/profiler/bolt/ManualFlushSignal.java |  54 ++
 .../profiler/bolt/ProfileBuilderBolt.java       | 374 +++++++++++---
 .../bolt/ProfileMeasurementEmitter.java         |  59 +++
 .../profiler/bolt/ProfileSplitterBolt.java      | 132 ++++-
 .../zookeeper/event-time-test/profiler.json     |  12 +
 .../bolt/FixedFrequencyFlushSignalTest.java     |  71 +++
 .../bolt/KafkaDestinationHandlerTest.java       | 203 --------
 .../metron/profiler/bolt/KafkaEmitterTest.java  | 208 ++++++++
 .../profiler/bolt/ProfileBuilderBoltTest.java   | 516 +++++++++++--------
 .../profiler/bolt/ProfileHBaseMapperTest.java   |   6 +-
 .../profiler/bolt/ProfileSplitterBoltTest.java  | 288 +++++++++--
 .../profiler/integration/MessageBuilder.java    |  75 +++
 .../integration/ProfilerIntegrationTest.java    | 235 ++++++---
 .../configuration/metron-profiler-env.xml       |  77 ++-
 .../package/scripts/params/params_linux.py      |   7 +
 .../package/templates/profiler.properties.j2    |  15 +-
 .../METRON/CURRENT/themes/metron_theme.json     | 118 ++++-
 .../configuration/profiler/ProfileConfig.java   |  53 ++
 .../configuration/profiler/ProfilerConfig.java  |  48 +-
 .../apache/metron/common/utils/JSONUtils.java   |  11 +-
 .../configurations/ProfilerUpdater.java         |   1 +
 .../profiler/ProfileConfigTest.java             |   5 +-
 .../profiler/ProfilerConfigTest.java            | 120 +++++
 .../integration/components/KafkaComponent.java  |  39 +-
 57 files changed, 3987 insertions(+), 1096 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
index 64c1e2e..d6afe1d 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerFunctions.java
@@ -101,7 +101,10 @@ public class ProfilerFunctions {
         throw new IllegalArgumentException("Invalid profiler configuration", e);
       }
 
-      return new StandAloneProfiler(profilerConfig, periodDurationMillis, context);
+      // the TTL and max routes do not matter here
+      long profileTimeToLiveMillis = Long.MAX_VALUE;
+      long maxNumberOfRoutes = Long.MAX_VALUE;
+      return new StandAloneProfiler(profilerConfig, periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, context);
     }
   }
 
@@ -138,13 +141,8 @@ public class ProfilerFunctions {
 
       // user must provide the stand alone profiler
       StandAloneProfiler profiler = Util.getArg(1, StandAloneProfiler.class, args);
-      try {
-        for (JSONObject message : messages) {
-          profiler.apply(message);
-        }
-
-      } catch (ExecutionException e) {
-        throw new IllegalArgumentException(format("Failed to apply message; error=%s", e.getMessage()), e);
+      for (JSONObject message : messages) {
+        profiler.apply(message);
       }
 
       return profiler;

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index 53377a0..ea5126f 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -20,14 +20,20 @@
 
 package org.apache.metron.profiler;
 
+import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.clock.WallClock;
 import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -36,32 +42,81 @@ import java.util.concurrent.TimeUnit;
 import static java.lang.String.format;
 
 /**
- * Distributes a message along a MessageRoute.  A MessageRoute will lead to one or
- * more ProfileBuilders.
+ * The default implementation of a {@link MessageDistributor}.
+ *
+ * <p>Two caches are maintained; one for active profiles and another for expired
+ * profiles.  A profile will remain on the active cache as long as it continues
+ * to receive messages.
+ *
+ * <p>If a profile has not received messages for an extended period of time, it
+ * is expired and moved to the expired cache.  A profile that is expired can no
+ * longer receive new messages.
+ *
+ * <p>A profile is stored in the expired cache for a fixed period of time so that
+ * a client can flush the state of expired profiles.  If the client does not flush
+ * the expired profiles using `flushExpired`, the state of these profiles will be
+ * lost.
  *
- * A ProfileBuilder is responsible for maintaining the state of a single profile,
- * for a single entity.  There will be one ProfileBuilder for each (profile, entity) pair.
- * This class ensures that each ProfileBuilder receives the telemetry messages that
- * it needs.
  */
 public class DefaultMessageDistributor implements MessageDistributor {
 
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   /**
    * The duration of each profile period in milliseconds.
    */
   private long periodDurationMillis;
 
   /**
-   * Maintains the state of a profile which is unique to a profile/entity pair.
+   * A cache of active profiles.
+   *
+   * A profile will remain on the active cache as long as it continues to receive
+   * messages.  Once it has not received messages for a period of time, it is
+   * moved to the expired cache.
+   */
+  private transient Cache<String, ProfileBuilder> activeCache;
+
+  /**
+   * A cache of expired profiles.
+   *
+   * When a profile expires from the active cache, it is moved here for a
+   * period of time.  In the expired cache a profile can no longer receive
+   * new messages.  A profile waits on the expired cache so that the client
+   * can flush the state of the expired profile.  If the client does not flush
+   * the expired profiles, this state will be lost forever.
    */
-  private transient Cache<String, ProfileBuilder> profileCache;
+  private transient Cache<String, ProfileBuilder> expiredCache;
 
   /**
    * Create a new message distributor.
+   *
+   * @param periodDurationMillis The period duration in milliseconds.
+   * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds.
+   * @param maxNumberOfRoutes The max number of unique routes to maintain.  After this is exceeded, lesser
+   *                          used routes will be evicted from the internal cache.
+   */
+  public DefaultMessageDistributor(
+          long periodDurationMillis,
+          long profileTimeToLiveMillis,
+          long maxNumberOfRoutes) {
+    this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker());
+  }
+
+  /**
+   * Create a new message distributor.
+   *
    * @param periodDurationMillis The period duration in milliseconds.
-   * @param profileTimeToLiveMillis The TTL of a profile in milliseconds.
+   * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds.
+   * @param maxNumberOfRoutes The max number of unique routes to maintain.  After this is exceeded, lesser
+   *                          used routes will be evicted from the internal cache.
+   * @param ticker The ticker used to drive time for the caches.  Only needs set for testing.
    */
-  public DefaultMessageDistributor(long periodDurationMillis, long profileTimeToLiveMillis) {
+  public DefaultMessageDistributor(
+          long periodDurationMillis,
+          long profileTimeToLiveMillis,
+          long maxNumberOfRoutes,
+          Ticker ticker) {
+
     if(profileTimeToLiveMillis < periodDurationMillis) {
       throw new IllegalStateException(format(
               "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
@@ -69,9 +124,23 @@ public class DefaultMessageDistributor implements MessageDistributor {
               periodDurationMillis));
     }
     this.periodDurationMillis = periodDurationMillis;
-    this.profileCache = CacheBuilder
+
+    // build the cache of active profiles
+    this.activeCache = CacheBuilder
             .newBuilder()
+            .maximumSize(maxNumberOfRoutes)
             .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
+            .removalListener(new ActiveCacheRemovalListener())
+            .ticker(ticker)
+            .build();
+
+    // build the cache of expired profiles
+    this.expiredCache = CacheBuilder
+            .newBuilder()
+            .maximumSize(maxNumberOfRoutes)
+            .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
+            .removalListener(new ExpiredCacheRemovalListener())
+            .ticker(ticker)
             .build();
   }
 
@@ -79,57 +148,120 @@ public class DefaultMessageDistributor implements MessageDistributor {
    * Distribute a message along a MessageRoute.
    *
    * @param message The message that needs distributed.
+   * @param timestamp The timestamp of the message.
    * @param route The message route.
    * @param context The Stellar execution context.
    * @throws ExecutionException
    */
   @Override
-  public void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException {
-    getBuilder(route, context).apply(message);
+  public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) {
+    try {
+      ProfileBuilder builder = getBuilder(route, context);
+      builder.apply(message, timestamp);
+
+    } catch(ExecutionException e) {
+      LOG.error("Unexpected error", e);
+      throw new RuntimeException(e);
+    }
   }
 
   /**
-   * Flushes all profiles.  Flushes all ProfileBuilders that this distributor is responsible for.
+   * Flush all active profiles.
+   *
+   * <p>A profile will remain active as long as it continues to receive messages.  If a profile
+   * does not receive a message for an extended duration, it may be marked as expired.
+   *
+   * <p>Flushes all active {@link ProfileBuilder} objects that this distributor is responsible for.
    *
-   * @return The profile measurements; one for each (profile, entity) pair.
+   * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
    */
   @Override
   public List<ProfileMeasurement> flush() {
+
+    // cache maintenance needed here to ensure active profiles will expire
+    activeCache.cleanUp();
+    expiredCache.cleanUp();
+
+    List<ProfileMeasurement> measurements = flushCache(activeCache);
+    return measurements;
+  }
+
+  /**
+   * Flush all expired profiles.
+   *
+   * <p>Flushes all expired {@link ProfileBuilder}s that this distributor is responsible for.
+   *
+   * <p>If a profile has not received messages for an extended period of time, it will be marked as
+   * expired.  When a profile is expired, it can no longer receive new messages.  Expired profiles
+   * remain only to give the client a chance to flush them.
+   *
+   * <p>If the client does not flush the expired profiles periodically, any state maintained in the
+   * profile since the last flush may be lost.
+   *
+   * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
+   */
+  @Override
+  public List<ProfileMeasurement> flushExpired() {
+
+    // cache maintenance needed here to ensure active profiles will expire
+    activeCache.cleanUp();
+    expiredCache.cleanUp();
+
+    // flush all expired profiles
+    List<ProfileMeasurement> measurements = flushCache(expiredCache);
+
+    // once the expired profiles have been flushed, they are no longer needed
+    expiredCache.invalidateAll();
+
+    return measurements;
+  }
+
+  /**
+   * Flush all of the profiles maintained in a cache.
+   *
+   * @param cache The cache to flush.
+   * @return The measurements captured when flushing the profiles.
+   */
+  private List<ProfileMeasurement> flushCache(Cache<String, ProfileBuilder> cache) {
+
     List<ProfileMeasurement> measurements = new ArrayList<>();
+    for(ProfileBuilder profileBuilder: cache.asMap().values()) {
 
-    profileCache.asMap().forEach((key, profileBuilder) -> {
+      // only need to flush, if the profile has been initialized
       if(profileBuilder.isInitialized()) {
+
+        // flush the profiler and save the measurement, if one exists
         Optional<ProfileMeasurement> measurement = profileBuilder.flush();
-        measurement.ifPresent(measurements::add);
+        measurement.ifPresent(m -> measurements.add(m));
       }
-    });
+    }
 
-    profileCache.cleanUp();
     return measurements;
   }
 
   /**
    * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile.  If none exists,
    * one will be created and returned.
+   *
    * @param route The message route.
    * @param context The Stellar execution context.
    */
   public ProfileBuilder getBuilder(MessageRoute route, Context context) throws ExecutionException {
     ProfileConfig profile = route.getProfileDefinition();
     String entity = route.getEntity();
-    return profileCache.get(
+    return activeCache.get(
             cacheKey(profile, entity),
             () -> new DefaultProfileBuilder.Builder()
                     .withDefinition(profile)
                     .withEntity(entity)
                     .withPeriodDurationMillis(periodDurationMillis)
                     .withContext(context)
-                    .withClock(new WallClock())
                     .build());
   }
 
   /**
-   * Builds the key that is used to lookup the ProfileState within the cache.
+   * Builds the key that is used to lookup the {@link ProfileBuilder} within the cache.
+   *
    * @param profile The profile definition.
    * @param entity The entity.
    */
@@ -145,4 +277,33 @@ public class DefaultMessageDistributor implements MessageDistributor {
   public DefaultMessageDistributor withPeriodDuration(int duration, TimeUnit units) {
     return withPeriodDurationMillis(units.toMillis(duration));
   }
+
+  /**
+   * A listener that is notified when profiles expire from the active cache.
+   */
+  private class ActiveCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+
+    @Override
+    public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+
+      String key = notification.getKey();
+      ProfileBuilder expired = notification.getValue();
+
+      LOG.warn("Profile expired from active cache; key={}", key);
+      expiredCache.put(key, expired);
+    }
+  }
+
+  /**
+   * A listener that is notified when profiles expire from the active cache.
+   */
+  private class ExpiredCacheRemovalListener implements RemovalListener<String, ProfileBuilder> {
+
+    @Override
+    public void onRemoval(RemovalNotification<String, ProfileBuilder> notification) {
+
+      String key = notification.getKey();
+      LOG.debug("Profile removed from expired cache; key={}", key);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
index 2e34160..4b564c9 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
@@ -20,7 +20,18 @@
 
 package org.apache.metron.profiler;
 
-import static java.lang.String.format;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
@@ -34,20 +45,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-import org.apache.commons.collections4.ListUtils;
-import org.apache.commons.collections4.MapUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.clock.Clock;
-import org.apache.metron.profiler.clock.WallClock;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
-import org.apache.metron.stellar.dsl.StellarFunctions;
-import org.json.simple.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
 
 /**
  * Responsible for building and maintaining a Profile.
@@ -94,16 +93,15 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
   private long periodDurationMillis;
 
   /**
-   * A clock is used to tell time; imagine that.
+   * Tracks the latest timestamp for use when flushing the profile.
    */
-  private Clock clock;
+  private long maxTimestamp;
 
   /**
-   * Use the ProfileBuilder.Builder to create a new ProfileBuilder.
+   * Private constructor.  Use the {@link Builder} to create a new {@link ProfileBuilder).
    */
   private DefaultProfileBuilder(ProfileConfig definition,
                                 String entity,
-                                Clock clock,
                                 long periodDurationMillis,
                                 Context stellarContext) {
 
@@ -111,27 +109,37 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
     this.definition = definition;
     this.profileName = definition.getProfile();
     this.entity = entity;
-    this.clock = clock;
     this.periodDurationMillis = periodDurationMillis;
     this.executor = new DefaultStellarStatefulExecutor();
     StellarFunctions.initialize(stellarContext);
     this.executor.setContext(stellarContext);
+    this.maxTimestamp = 0;
   }
 
   /**
    * Apply a message to the profile.
+   *
    * @param message The message to apply.
+   * @param timestamp The timestamp of the message.
    */
   @Override
-  public void apply(JSONObject message) {
+  public void apply(JSONObject message, long timestamp) {
     try {
       if (!isInitialized()) {
+
+        // execute each 'init' expression
         assign(definition.getInit(), message, "init");
         isInitialized = true;
       }
 
+      // execute each 'update' expression
       assign(definition.getUpdate(), message, "update");
 
+      // keep track of the 'latest' timestamp seen for use when flushing the profile
+      if(timestamp > maxTimestamp) {
+        maxTimestamp = timestamp;
+      }
+
     } catch(Throwable e) {
       LOG.error(format("Unable to apply message to profile: %s", e.getMessage()), e);
     }
@@ -140,23 +148,30 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
   /**
    * Flush the Profile.
    *
-   * Completes and emits the ProfileMeasurement.  Clears all state in preparation for
+   * <p>Completes and emits the {@link ProfileMeasurement}.  Clears all state in preparation for
    * the next window period.
    *
-   * @return Returns the completed profile measurement.
+   * @return Returns the completed {@link ProfileMeasurement}.
    */
   @Override
   public Optional<ProfileMeasurement> flush() {
-    LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
-    Optional<ProfileMeasurement> result = Optional.empty();
-    ProfilePeriod period = new ProfilePeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS);
+
+    Optional<ProfileMeasurement> result;
+    ProfilePeriod period = new ProfilePeriod(maxTimestamp, periodDurationMillis, TimeUnit.MILLISECONDS);
 
     try {
-      // execute the 'profile' expression(s)
-      Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile");
+      // execute the 'profile' expression
+      String profileExpression = definition
+              .getResult()
+              .getProfileExpressions()
+              .getExpression();
+      Object profileValue = execute(profileExpression, "result/profile");
 
       // execute the 'triage' expression(s)
-      Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions()
+      Map<String, Object> triageValues = definition
+              .getResult()
+              .getTriageExpressions()
+              .getExpressions()
               .entrySet()
               .stream()
               .collect(Collectors.toMap(
@@ -185,10 +200,21 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
               .withDefinition(definition));
 
     } catch(Throwable e) {
+
       // if any of the Stellar expressions fail, a measurement should NOT be returned
       LOG.error(format("Unable to flush profile: error=%s", e.getMessage()), e);
+      result = Optional.empty();
     }
 
+    LOG.debug("Flushed profile: profile={}, entity={}, maxTime={}, period={}, start={}, end={}, duration={}",
+            profileName,
+            entity,
+            maxTimestamp,
+            period.getPeriod(),
+            period.getStartTimeMillis(),
+            period.getEndTimeMillis(),
+            period.getDurationMillis());
+
     isInitialized = false;
     return result;
   }
@@ -214,6 +240,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
 
   /**
    * Executes an expression contained within the profile definition.
+   *
    * @param expression The expression to execute.
    * @param transientState Additional transient state provided to the expression.
    * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
@@ -232,6 +259,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
 
   /**
    * Executes an expression contained within the profile definition.
+   *
    * @param expression The expression to execute.
    * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
    * @return The result of executing the expression.
@@ -242,6 +270,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
 
   /**
    * Executes a set of expressions whose results need to be assigned to a variable.
+   *
    * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
    * @param transientState Additional transient state provided to the expression.
    * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
@@ -254,6 +283,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
       String expr = entry.getValue();
 
       try {
+
         // assign the result of the expression to the variable
         executor.assign(var, expr, transientState);
 
@@ -274,6 +304,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
 
   /**
    * Executes the expressions contained within the profile definition.
+   *
    * @param expressions A list of expressions to execute.
    * @param transientState Additional transient state provided to the expressions.
    * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
@@ -284,6 +315,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
 
     for(String expr: ListUtils.emptyIfNull(expressions)) {
       try {
+
         // execute an expression
         Object result = executor.execute(expr, transientState, Object.class);
         results.add(result);
@@ -305,15 +337,19 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
     return results;
   }
 
+  @Override
+  public String getEntity() {
+    return entity;
+  }
+
   /**
-   * A builder used to construct a new ProfileBuilder.
+   * A builder should be used to construct a new {@link ProfileBuilder} object.
    */
   public static class Builder {
 
     private ProfileConfig definition;
     private String entity;
-    private long periodDurationMillis;
-    private Clock clock = new WallClock();
+    private Long periodDurationMillis;
     private Context context;
 
     public Builder withContext(Context context) {
@@ -321,11 +357,6 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
       return this;
     }
 
-    public Builder withClock(Clock clock) {
-      this.clock = clock;
-      return this;
-    }
-
     /**
      * @param definition The profiler definition.
      */
@@ -370,8 +401,11 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable {
       if(StringUtils.isEmpty(entity)) {
         throw new IllegalArgumentException(format("missing entity name; got '%s'", entity));
       }
+      if(periodDurationMillis == null) {
+        throw new IllegalArgumentException("missing period duration");
+      }
 
-      return new DefaultProfileBuilder(definition, entity, clock, periodDurationMillis, context);
+      return new DefaultProfileBuilder(definition, entity, periodDurationMillis, context);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
index a60446f..ea5be0f 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
@@ -24,33 +24,57 @@ import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
 
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
 /**
- * Distributes a message along a MessageRoute.  A MessageRoute will lead to one or
- * more ProfileBuilders.
+ * Distributes a telemetry message along a {@link MessageRoute}. A {@link MessageRoute} will lead to a
+ * {@link ProfileBuilder} that is responsible for building and maintaining a profile.
  *
- * A ProfileBuilder is responsible for maintaining the state of a single profile,
- * for a single entity.  There will be one ProfileBuilder for each (profile, entity) pair.
- * This class ensures that each ProfileBuilder receives the telemetry messages that
- * it needs.
+ * <p>A {@link ProfileBuilder} is responsible for maintaining the state of a single (profile, entity)
+ * pairing.  There will be one {@link ProfileBuilder} for each (profile, entity) pair.
+ *
+ * <p>A {@link MessageDistributor} ensures that each {@link ProfileBuilder} receives the telemetry
+ * messages that it needs.
+ *
+ * @see MessageRoute
+ * @see ProfileMeasurement
  */
 public interface MessageDistributor {
 
   /**
-   * Distribute a message along a MessageRoute.
+   * Distribute a message along a {@link MessageRoute}.
    *
    * @param message The message that needs distributed.
+   * @param timestamp The timestamp of the message.
    * @param route The message route.
    * @param context The Stellar execution context.
-   * @throws ExecutionException
    */
-  void distribute(JSONObject message, MessageRoute route, Context context) throws ExecutionException;
+  void distribute(JSONObject message, long timestamp, MessageRoute route, Context context);
 
   /**
-   * Flushes all profiles.  Flushes all ProfileBuilders that this distributor is responsible for.
+   * Flush all active profiles.
+   *
+   * <p>A profile will remain active as long as it continues to receive messages.  If a profile
+   * does not receive a message for an extended duration, it may be marked as expired.
    *
-   * @return The profile measurements; one for each (profile, entity) pair.
+   * <p>Flushes all active {@link ProfileBuilder} objects that this distributor is responsible for.
+   *
+   * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
    */
   List<ProfileMeasurement> flush();
+
+  /**
+   * Flush all expired profiles.
+   *
+   * <p>If a profile has not received messages for an extended period of time, it will be marked as
+   * expired.  When a profile is expired, it can no longer receive new messages.  Expired profiles
+   * remain only to give the client a chance to flush them.
+   *
+   * <p>If the client does not flush the expired profiles periodically, any state maintained in the
+   * profile since the last flush may be lost.
+   *
+   * <p>Flushes all expired {@link ProfileBuilder} objects that this distributor is responsible for.
+   *
+   * @return The {@link ProfileMeasurement} values; one for each (profile, entity) pair.
+   */
+  List<ProfileMeasurement> flushExpired();
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
index 1945671..7288f03 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -23,12 +23,15 @@ package org.apache.metron.profiler;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 
 /**
- * A MessageRoute defines the profile and entity that a telemetry message needs applied to.  This
- * allows a message to be routed to the profile and entity that needs it.
+ * Defines the 'route' a message must take through the Profiler.
  *
- * One telemetry message may need multiple routes.  This is the case when a message is needed by
- * more than one profile.  In this case, there will be multiple MessageRoute objects for a single
- * message.
+ * <p>A {@link MessageRoute} defines the profile and entity that a telemetry message needs applied to.
+ *
+ * <p>If a message is needed by multiple profiles, then multiple {@link MessageRoute} values
+ * will exist.  If a message is not needed by any profiles, then no {@link MessageRoute} values
+ * will exist.
+ *
+ * @see MessageRouter
  */
 public class MessageRoute {
 
@@ -42,6 +45,12 @@ public class MessageRoute {
    */
   private String entity;
 
+  /**
+   * Create a {@link MessageRoute}.
+   *
+   * @param profileDefinition The profile definition.
+   * @param entity The entity.
+   */
   public MessageRoute(ProfileConfig profileDefinition, String entity) {
     this.entity = entity;
     this.profileDefinition = profileDefinition;

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
index 99c98a3..4c18062 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRouter.java
@@ -27,15 +27,18 @@ import org.json.simple.JSONObject;
 import java.util.List;
 
 /**
- * Routes incoming telemetry messages.
+ * Routes incoming telemetry messages through the Profiler.
  *
- * A single telemetry message may need to take multiple routes.  This is the case
- * when a message is needed by more than one profile.
+ * <p>If a message is needed by multiple profiles, then multiple {@link MessageRoute} values
+ * will be returned.  If a message is not needed by any profiles, then no {@link MessageRoute} values
+ * will be returned.
+ *
+ * @see MessageRoute
  */
 public interface MessageRouter {
 
   /**
-   * Route a telemetry message.  Finds all routes for a given telemetry message.
+   * Finds all routes for a telemetry message.
    *
    * @param message The telemetry message that needs routed.
    * @param config The configuration for the Profiler.

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
index c09b0b6..07372d7 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -28,47 +28,61 @@ import java.util.Optional;
 /**
  * Responsible for building and maintaining a Profile.
  *
- * One or more messages are applied to the Profile with `apply` and a profile measurement is
- * produced by calling `flush`.
+ * <p>Telemetry messages are applied to a profile using {@link ProfileBuilder#apply(JSONObject, long)}.  A
+ * {@link ProfileMeasurement} is generated by calling {@link ProfileBuilder#flush()}.
  *
- * Any one instance is responsible only for building the profile for a specific [profile, entity]
- * pairing.  There will exist many instances, one for each [profile, entity] pair that exists
+ * A {@link ProfileBuilder} is responsible only for building the profile for a specific [profile, entity]
+ * pair.  There will exist many instances, one for each [profile, entity] pair that exists
  * within the incoming telemetry data applied to the profile.
  */
 public interface ProfileBuilder {
 
   /**
    * Apply a message to the profile.
+   *
    * @param message The message to apply.
+   * @param timestamp The timestamp of the message.
    */
-  void apply(JSONObject message);
+  void apply(JSONObject message, long timestamp);
 
   /**
    * Flush the Profile.
    *
-   * Completes and emits the ProfileMeasurement.  Clears all state in preparation for
+   * <p>Completes the period and returns the {@link ProfileMeasurement}.  Clears all state in preparation for
    * the next window period.
    *
-   * @return Returns the completed profile measurement.
+   * @return Returns the {@link ProfileMeasurement}.
    */
   Optional<ProfileMeasurement> flush();
 
   /**
-   * Has the ProfileBuilder been initialized?
+   * Has the {@link ProfileBuilder} been initialized?
+   *
    * @return True, if initialization has occurred.  False, otherwise.
    */
   boolean isInitialized();
 
   /**
    * Returns the definition of the profile being built.
-   * @return ProfileConfig definition of the profile
+   *
+   * @return The profile definition.
    */
   ProfileConfig getDefinition();
 
   /**
-   * Returns the value of a variable being maintained by the builder.
+   * Returns the value of a variable within the current profile state.
+   *
    * @param variable The variable name.
    * @return The value of the variable.
    */
   Object valueOf(String variable);
+
+  /**
+   * Returns the name of the entity.
+   *
+   * <p>Each {@code ProfileBuilder} instance is responsible for one (profile, entity) pair.
+   *
+   * @return The entity.
+   */
+  String getEntity();
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index 0e773e9..f6cc286 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -28,10 +28,10 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Represents a single data point within a Profile.
+ * Represents a single data point within a profile.
  *
- * A Profile is effectively a time series.  To this end a Profile is composed
- * of many ProfileMeasurement values which in aggregate form a time series.
+ * <p>A profile contains many individual {@link ProfileMeasurement} values captured over a
+ * period of time.  These values in aggregate form a time series.
  */
 public class ProfileMeasurement {
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
index 6db7079..f79efe6 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
@@ -21,18 +21,29 @@
 package org.apache.metron.profiler;
 
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.DefaultClockFactory;
 import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
 /**
- * A stand alone version of the Profiler that does not require a
- * distributed execution environment like Apache Storm.
+ * A stand alone version of the Profiler that does not require a distributed
+ * execution environment like Apache Storm.
+ *
+ * <p>This class is used to create and manage profiles within the REPL environment.
  */
 public class StandAloneProfiler {
 
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   /**
    * The Stellar execution context.
    */
@@ -54,6 +65,11 @@ public class StandAloneProfiler {
   private MessageDistributor distributor;
 
   /**
+   * The factory that creates Clock objects.
+   */
+  private ClockFactory clockFactory;
+
+  /**
    * Counts the number of messages that have been applied.
    */
   private int messageCount;
@@ -67,12 +83,26 @@ public class StandAloneProfiler {
    */
   private int routeCount;
 
-  public StandAloneProfiler(ProfilerConfig config, long periodDurationMillis, Context context) {
+  /**
+   * Create a new Profiler.
+   *
+   * @param config The Profiler configuration.
+   * @param periodDurationMillis The period duration in milliseconds.
+   * @param profileTimeToLiveMillis The time-to-live of a profile in milliseconds.
+   * @param maxNumberOfRoutes The max number of unique routes to maintain.  After this is exceeded, lesser
+   *                          used routes will be evicted from the internal cache.
+   * @param context The Stellar execution context.
+   */
+  public StandAloneProfiler(ProfilerConfig config,
+                            long periodDurationMillis,
+                            long profileTimeToLiveMillis,
+                            long maxNumberOfRoutes,
+                            Context context) {
     this.context = context;
     this.config = config;
     this.router = new DefaultMessageRouter(context);
-    // the period TTL does not matter in this context
-    this.distributor = new DefaultMessageDistributor(periodDurationMillis, Long.MAX_VALUE);
+    this.distributor = new DefaultMessageDistributor(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes);
+    this.clockFactory = new DefaultClockFactory();
     this.messageCount = 0;
     this.routeCount = 0;
   }
@@ -80,26 +110,28 @@ public class StandAloneProfiler {
   /**
    * Apply a message to a set of profiles.
    * @param message The message to apply.
-   * @throws ExecutionException
    */
-  public void apply(JSONObject message) throws ExecutionException {
+  public void apply(JSONObject message) {
 
-    List<MessageRoute> routes = router.route(message, config, context);
-    for(MessageRoute route : routes) {
-      distributor.distribute(message, route, context);
-    }
+    // what time is it?
+    Clock clock = clockFactory.createClock(config);
+    Optional<Long> timestamp = clock.currentTimeMillis(message);
 
-    routeCount += routes.size();
-    messageCount += 1;
-  }
+    // can only route the message, if we have a timestamp
+    if(timestamp.isPresent()) {
 
-  @Override
-  public String toString() {
-    return "Profiler{" +
-            getProfileCount() + " profile(s), " +
-            getMessageCount() + " messages(s), " +
-            getRouteCount() + " route(s)" +
-            '}';
+      // route the message to the correct profile builders
+      List<MessageRoute> routes = router.route(message, config, context);
+      for (MessageRoute route : routes) {
+        distributor.distribute(message, timestamp.get(), route, context);
+      }
+
+      routeCount += routes.size();
+      messageCount += 1;
+
+    } else {
+      LOG.warn("No timestamp available for the message. The message will be ignored.");
+    }
   }
 
   /**
@@ -110,19 +142,45 @@ public class StandAloneProfiler {
     return distributor.flush();
   }
 
+  /**
+   * Returns the Profiler configuration.
+   * @return The Profiler configuration.
+   */
   public ProfilerConfig getConfig() {
     return config;
   }
 
+  /**
+   * Returns the number of defined profiles.
+   * @return The number of defined profiles.
+   */
   public int getProfileCount() {
     return (config == null) ? 0: config.getProfiles().size();
   }
 
+  /**
+   * Returns the number of messages that have been applied.
+   * @return The number of messages that have been applied.
+   */
   public int getMessageCount() {
     return messageCount;
   }
 
+  /**
+   * Returns the number of routes.
+   * @return The number of routes.
+   * @see MessageRoute
+   */
   public int getRouteCount() {
     return routeCount;
   }
+
+  @Override
+  public String toString() {
+    return "Profiler{" +
+            getProfileCount() + " profile(s), " +
+            getMessageCount() + " messages(s), " +
+            getRouteCount() + " route(s)" +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
index 6730e49..b07c0ed 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
@@ -20,16 +20,24 @@
 
 package org.apache.metron.profiler.clock;
 
+import org.json.simple.JSONObject;
+
+import java.util.Optional;
+
 /**
- * A clock can tell time; imagine that.
+ * A {@link Clock} manages the progression of time in the Profiler.
  *
- * This allows the Profiler to support different treatments of time like wall clock versus event time.
+ * <p>The Profiler can operate on either processing time or event time.  This
+ * abstraction deals with the differences between the two.
  */
 public interface Clock {
 
   /**
-   * The current time in epoch milliseconds.
+   * Returns the current time in epoch milliseconds.
+   *
+   * @param message The telemetry message.
+   * @return An optional value containing the current time in epoch milliseconds, if
+   *         the current time is known.  Otherwise, empty.
    */
-  long currentTimeMillis();
-
+  Optional<Long> currentTimeMillis(JSONObject message);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java
new file mode 100644
index 0000000..5435c48
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/ClockFactory.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+/**
+ * A factory for creating {@link Clock} objects.
+ *
+ * The type of {@link Clock} needed will depend on the Profiler configuration.
+ */
+public interface ClockFactory {
+
+  /**
+   * Creates and returns a {@link Clock}.
+   *
+   * @param config The profiler configuration.
+   * @return A {@link Clock}.
+   */
+  Clock createClock(ProfilerConfig config);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java
new file mode 100644
index 0000000..d62e62b
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/DefaultClockFactory.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+/**
+ * Creates a {@link Clock} based on the profiler configuration.
+ *
+ * <p>If the Profiler is configured to use event time, a {@link EventTimeClock} will
+ * be created.  Otherwise, a {@link WallClock} will be created.
+ *
+ * <p>The default implementation of a {@link ClockFactory}.
+ */
+public class DefaultClockFactory implements ClockFactory {
+
+  /**
+   * @param config The profiler configuration.
+   * @return The appropriate Clock based on the profiler configuration.
+   */
+  @Override
+  public Clock createClock(ProfilerConfig config) {
+    Clock clock;
+
+    boolean isEventTime = config.getTimestampField().isPresent();
+    if(isEventTime) {
+
+      // using event time
+      String timestampField = config.getTimestampField().get();
+      clock = new EventTimeClock(timestampField);
+
+    } else {
+
+      // using processing time
+      clock = new WallClock();
+    }
+
+    return clock;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java
new file mode 100644
index 0000000..5cd574e
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/EventTimeClock.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Optional;
+
+/**
+ * A {@link Clock} that advances based on event time.
+ *
+ * Event time is advanced by the timestamps contained within telemetry messages, rather
+ * than the system clock.
+ */
+public class EventTimeClock implements Clock {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The name of the field from which the timestamp will
+   */
+  private String timestampField;
+
+  /**
+   * @param timestampField The name of the field containing a timestamp.
+   */
+  public EventTimeClock(String timestampField) {
+    this.timestampField = timestampField;
+  }
+
+  @Override
+  public Optional<Long> currentTimeMillis(JSONObject message) {
+
+    Long result;
+    if(message != null && message.containsKey(timestampField)) {
+
+      // extract the timestamp and convert to a long
+      Object timestamp = message.get(timestampField);
+      result = ConversionUtils.convert(timestamp, Long.class);
+
+    } else {
+
+      // the message does not contain the specified timestamp field
+      LOG.debug("message does not contain timestamp field '{}': message will be ignored: message='{}'",
+              timestampField, JSONObject.toJSONString(message));
+      result = null;
+    }
+
+    return Optional.ofNullable(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
index c6e93cd..8259ed0 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
@@ -20,21 +20,50 @@
 
 package org.apache.metron.profiler.clock;
 
-import java.io.Serializable;
+import org.json.simple.JSONObject;
+
+import java.util.Optional;
 
 /**
- * A clock that reports whatever time you tell it to.  Most useful for testing.
+ * A {@link Clock} that always reports the same time.
+ *
+ * <p>This is only useful for testing.
  */
-public class FixedClock implements Clock, Serializable {
+public class FixedClock implements Clock {
 
+  /**
+   * The time in milliseconds since the epoch.
+   */
   private long epochMillis;
 
+  /**
+   * Create a {@link Clock}.  The time defaults to the epoch.
+   */
+  public FixedClock() {
+    this(0);
+  }
+
+  /**
+   * Create a {@link Clock}.
+   * @param epochMillis The time in milliseconds since the epoch.
+   */
+  public FixedClock(long epochMillis) {
+    this.setTime(epochMillis);
+  }
+
+  /**
+   * Set the current time.
+   * @param epochMillis The time in milliseconds since the epoch.
+   */
   public void setTime(long epochMillis) {
     this.epochMillis = epochMillis;
   }
 
+  /**
+   * @return The time in milliseconds since the epoch.
+   */
   @Override
-  public long currentTimeMillis() {
-    return this.epochMillis;
+  public Optional<Long> currentTimeMillis(JSONObject message) {
+    return Optional.of(this.epochMillis);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java
new file mode 100644
index 0000000..b0248cd
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClockFactory.java
@@ -0,0 +1,44 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.clock;
+
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+
+/**
+ * A {@link ClockFactory} that always returns a {@link FixedClock}.
+ *
+ * <p>A {@link FixedClock} always returns the same time and is only useful for testing.
+ */
+public class FixedClockFactory implements ClockFactory {
+
+  private long timestamp;
+
+  /**
+   * @param timestamp The timestamp that all {@link Clock} objects created by this factory will report.
+   */
+  public FixedClockFactory(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public Clock createClock(ProfilerConfig config) {
+    return new FixedClock(timestamp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
index 1a20c94..20f62e3 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
@@ -20,15 +20,22 @@
 
 package org.apache.metron.profiler.clock;
 
-import java.io.Serializable;
+import org.json.simple.JSONObject;
+
+import java.util.Optional;
 
 /**
- * A clock that uses the system clock to provide wall clock time.
+ * A {@link Clock} that advances based on system time.
+ *
+ * <p>This {@link Clock} is used to advance time when the Profiler is running
+ * on processing time, rather than event time.
  */
-public class WallClock implements Clock, Serializable {
+public class WallClock implements Clock {
 
   @Override
-  public long currentTimeMillis() {
-    return System.currentTimeMillis();
+  public Optional<Long> currentTimeMillis(JSONObject message) {
+
+    // the message does not matter; use system time
+    return Optional.of(System.currentTimeMillis());
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index ff4c289..ea9c5c6 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler;
 
+import com.google.common.base.Ticker;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.utils.JSONUtils;
@@ -33,6 +34,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.TimeUnit.HOURS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.junit.Assert.assertEquals;
 
 public class DefaultMessageDistributorTest {
@@ -83,16 +87,22 @@ public class DefaultMessageDistributorTest {
 
   private DefaultMessageDistributor distributor;
   private Context context;
+  private long periodDurationMillis = MINUTES.toMillis(15);
+  private long profileTimeToLiveMillis = MINUTES.toMillis(30);
+  private long maxNumberOfRoutes = Long.MAX_VALUE;
 
   @Before
   public void setup() throws Exception {
+
     context = Context.EMPTY_CONTEXT();
     JSONParser parser = new JSONParser();
     messageOne = (JSONObject) parser.parse(inputOne);
     messageTwo = (JSONObject) parser.parse(inputTwo);
+
     distributor = new DefaultMessageDistributor(
-            TimeUnit.MINUTES.toMillis(15),
-            TimeUnit.MINUTES.toMillis(30));
+            periodDurationMillis,
+            profileTimeToLiveMillis,
+            maxNumberOfRoutes);
   }
 
   /**
@@ -108,15 +118,18 @@ public class DefaultMessageDistributorTest {
    */
   @Test
   public void testDistribute() throws Exception {
+
+    // setup
+    long timestamp = 100;
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
     MessageRoute route = new MessageRoute(definition, entity);
 
-    // distribute one message
-    distributor.distribute(messageOne, route, context);
+    // distribute one message and flush
+    distributor.distribute(messageOne, timestamp, route, context);
+    List<ProfileMeasurement> measurements = distributor.flush();
 
     // expect one measurement coming from one profile
-    List<ProfileMeasurement> measurements = distributor.flush();
     assertEquals(1, measurements.size());
     ProfileMeasurement m = measurements.get(0);
     assertEquals(definition.getProfile(), m.getProfileName());
@@ -126,12 +139,17 @@ public class DefaultMessageDistributorTest {
   @Test
   public void testDistributeWithTwoProfiles() throws Exception {
 
-    // distribute one message to the first profile
+    // setup
+    long timestamp = 100;
     String entity = (String) messageOne.get("ip_src_addr");
-    distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entity), context);
+
+    // distribute one message to the first profile
+    MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity);
+    distributor.distribute(messageOne, timestamp, routeOne, context);
 
     // distribute another message to the second profile, but same entity
-    distributor.distribute(messageOne, new MessageRoute(createDefinition(profileTwo), entity), context);
+    MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity);
+    distributor.distribute(messageOne, timestamp, routeTwo, context);
 
     // expect 2 measurements; 1 for each profile
     List<ProfileMeasurement> measurements = distributor.flush();
@@ -141,17 +159,150 @@ public class DefaultMessageDistributorTest {
   @Test
   public void testDistributeWithTwoEntities() throws Exception {
 
+    // setup
+    long timestamp = 100;
+
     // distribute one message
     String entityOne = (String) messageOne.get("ip_src_addr");
-    distributor.distribute(messageOne, new MessageRoute(createDefinition(profileOne), entityOne), context);
+    MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne);
+    distributor.distribute(messageOne, timestamp, routeOne, context);
 
     // distribute another message with a different entity
     String entityTwo = (String) messageTwo.get("ip_src_addr");
-    distributor.distribute(messageTwo, new MessageRoute(createDefinition(profileTwo), entityTwo), context);
+    MessageRoute routeTwo =  new MessageRoute(createDefinition(profileTwo), entityTwo);
+    distributor.distribute(messageTwo, timestamp, routeTwo, context);
 
     // expect 2 measurements; 1 for each entity
     List<ProfileMeasurement> measurements = distributor.flush();
     assertEquals(2, measurements.size());
   }
 
+  /**
+   * A profile should expire after a fixed period of time.  This test ensures that
+   * profiles are not expired before they are supposed to be.
+   */
+  @Test
+  public void testNotYetTimeToExpireProfiles() throws Exception {
+
+    // the ticker drives time to allow us to test cache expiration
+    FixedTicker ticker = new FixedTicker();
+
+    // setup
+    ProfileConfig definition = createDefinition(profileOne);
+    String entity = (String) messageOne.get("ip_src_addr");
+    MessageRoute route = new MessageRoute(definition, entity);
+    distributor = new DefaultMessageDistributor(
+            periodDurationMillis,
+            profileTimeToLiveMillis,
+            maxNumberOfRoutes,
+            ticker);
+
+    // distribute one message
+    distributor.distribute(messageOne, 1000000, route, context);
+
+    // advance time to just shy of the profile TTL
+    ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS);
+
+    // the profile should NOT have expired yet
+    assertEquals(0, distributor.flushExpired().size());
+    assertEquals(1, distributor.flush().size());
+  }
+
+  /**
+   * A profile should expire after a fixed period of time.
+   */
+  @Test
+  public void testProfilesShouldExpire() throws Exception {
+
+    // the ticker drives time to allow us to test cache expiration
+    FixedTicker ticker = new FixedTicker();
+
+    // setup
+    ProfileConfig definition = createDefinition(profileOne);
+    String entity = (String) messageOne.get("ip_src_addr");
+    MessageRoute route = new MessageRoute(definition, entity);
+    distributor = new DefaultMessageDistributor(
+            periodDurationMillis,
+            profileTimeToLiveMillis,
+            maxNumberOfRoutes,
+            ticker);
+
+    // distribute one message
+    distributor.distribute(messageOne, 100000, route, context);
+
+    // advance time to just beyond the period duration
+    ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS);
+
+    // the profile should have expired by now
+    assertEquals(1, distributor.flushExpired().size());
+    assertEquals(0, distributor.flush().size());
+  }
+
+  /**
+   * An expired profile is only kept around for a fixed period of time.  It should be removed, if it
+   * has been on the expired cache for too long.
+   */
+  @Test
+  public void testExpiredProfilesShouldBeRemoved() throws Exception {
+
+    // the ticker drives time to allow us to test cache expiration
+    FixedTicker ticker = new FixedTicker();
+
+    // setup
+    ProfileConfig definition = createDefinition(profileOne);
+    String entity = (String) messageOne.get("ip_src_addr");
+    MessageRoute route = new MessageRoute(definition, entity);
+    distributor = new DefaultMessageDistributor(
+            periodDurationMillis,
+            profileTimeToLiveMillis,
+            maxNumberOfRoutes,
+            ticker);
+
+    // distribute one message
+    distributor.distribute(messageOne, 1000000, route, context);
+
+    // advance time a couple of hours
+    ticker.advanceTime(2, HOURS);
+
+    // the profile should have been expired
+    assertEquals(0, distributor.flush().size());
+
+    // advance time a couple of hours
+    ticker.advanceTime(2, HOURS);
+
+    // the profile should have been removed from the expired cache
+    assertEquals(0, distributor.flushExpired().size());
+  }
+
+  /**
+   * An implementation of Ticker that can be used to drive time
+   * when testing the Guava caches.
+   */
+  private class FixedTicker extends Ticker {
+
+    /**
+     * The time that will be reported.
+     */
+    private long timestampNanos;
+
+    public FixedTicker() {
+      this.timestampNanos = Ticker.systemTicker().read();
+    }
+
+    public FixedTicker startAt(long timestampNanos) {
+      this.timestampNanos = timestampNanos;
+      return this;
+    }
+
+    public FixedTicker advanceTime(long time, TimeUnit units) {
+      this.timestampNanos += units.toNanos(time);
+      return this;
+    }
+
+    @Override
+    public long read() {
+      return this.timestampNanos;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
index d25b7ff..24eb5f8 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
@@ -23,8 +23,6 @@ package org.apache.metron.profiler;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.clock.Clock;
-import org.apache.metron.profiler.clock.FixedClock;
 import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -82,7 +80,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testInit() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -92,7 +92,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -106,7 +106,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testInitWithNoMessage() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -146,7 +148,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testUpdate() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -158,7 +162,12 @@ public class DefaultProfileBuilderTest {
     // execute
     int count = 10;
     for(int i=0; i<count; i++) {
-      builder.apply(message);
+
+      // apply the message
+      builder.apply(message, timestamp);
+
+      // advance time
+      timestamp += 5;
     }
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
@@ -183,7 +192,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testResult() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -193,7 +204,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -206,40 +217,38 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testProfilePeriodOnFlush() throws Exception {
-    // setup
-    FixedClock clock = new FixedClock();
-    clock.setTime(100);
 
+    // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
             .withEntity("10.0.0.1")
             .withPeriodDuration(10, TimeUnit.MINUTES)
             .withContext(Context.EMPTY_CONTEXT())
-            .withClock(clock)
             .build();
 
     {
       // apply a message and flush
-      builder.apply(message);
+      builder.apply(message, timestamp);
       Optional<ProfileMeasurement> m = builder.flush();
       assertTrue(m.isPresent());
 
       // validate the profile period
-      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+      ProfilePeriod expected = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES);
       assertEquals(expected, m.get().getPeriod());
     }
     {
-      // advance time by at least one period - 10 minutes
-      clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
+      // advance time by at least one period... about 10 minutes
+      timestamp += TimeUnit.MINUTES.toMillis(10);
 
       // apply a message and flush again
-      builder.apply(message);
+      builder.apply(message, timestamp);
       Optional<ProfileMeasurement> m = builder.flush();
       assertTrue(m.isPresent());
 
       // validate the profile period
-      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+      ProfilePeriod expected = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES);
       assertEquals(expected, m.get().getPeriod());
     }
   }
@@ -262,7 +271,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testGroupBy() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -272,7 +283,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -300,23 +311,20 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testStateAvailableToGroupBy() throws Exception {
-    FixedClock clock = new FixedClock();
-    clock.setTime(1503081070340L);
-    long periodDurationMillis = TimeUnit.MINUTES.toMillis(10);
-    ProfilePeriod period = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
 
     // setup
+    long timestamp = 1503081070340L;
+    ProfilePeriod period = new ProfilePeriod(timestamp, 10, TimeUnit.MINUTES);
     definition = JSONUtils.INSTANCE.load(testStateAvailableToGroupBy, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
             .withEntity("10.0.0.1")
             .withPeriodDuration(10, TimeUnit.MINUTES)
             .withContext(Context.EMPTY_CONTEXT())
-            .withClock(clock)
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -350,7 +358,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testFlushDoesNotClearsState() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -362,16 +372,24 @@ public class DefaultProfileBuilderTest {
     // execute - accumulate some state then flush it
     int count = 10;
     for(int i=0; i<count; i++) {
-      builder.apply(message);
+
+      // apply the message
+      builder.apply(message, timestamp);
+
+      // advance time
+      timestamp += 5;
     }
     builder.flush();
 
+    // advance time beyond the current period
+    timestamp += TimeUnit.MINUTES.toMillis(20);
+
     // apply another message to accumulate new state, then flush again to validate original state was cleared
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
-    assertTrue(m.isPresent());
 
     // validate
+    assertTrue(m.isPresent());
     assertEquals(33, m.get().getProfileValue());
   }
 
@@ -395,7 +413,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testFlushDoesNotClearsStateButInitDoes() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testFlushProfileWithNaiveInit, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -407,18 +427,27 @@ public class DefaultProfileBuilderTest {
     // execute - accumulate some state then flush it
     int count = 10;
     for(int i=0; i<count; i++) {
-      builder.apply(message);
+
+      // apply a message
+      builder.apply(message, timestamp);
+
+      // advance time
+      timestamp += 5;
     }
     builder.flush();
 
+    // advance time beyond the current period
+    timestamp += TimeUnit.MINUTES.toMillis(20);
+
     // apply another message to accumulate new state, then flush again to validate original state was cleared
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
     // validate
     assertEquals(3, m.get().getProfileValue());
   }
+
   /**
    * {
    *   "profile": "test",
@@ -434,7 +463,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testEntity() throws Exception {
+
     // setup
+    long timestamp = 100;
     final String entity = "10.0.0.1";
     definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
@@ -445,7 +476,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -473,7 +504,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testResultWithProfileExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -483,7 +516,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -515,7 +548,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testResultWithTriageExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -525,7 +560,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
     Optional<ProfileMeasurement> m = builder.flush();
     assertTrue(m.isPresent());
 
@@ -550,7 +585,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testBadInitExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(badInitProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -560,7 +597,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // due to the bad expression, there should be no result
-    builder.apply(message);
+    builder.apply(message, timestamp);
     assertFalse(builder.flush().isPresent());
   }
 
@@ -579,7 +616,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testBadResultExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(badSimpleResultProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -589,7 +628,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // due to the bad expression, there should be no result
-    builder.apply(message);
+    builder.apply(message, timestamp);
     assertFalse(builder.flush().isPresent());
   }
 
@@ -608,7 +647,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testBadGroupByExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(badGroupByProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -618,7 +659,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // due to the bad expression, there should be no result
-    builder.apply(message);
+    builder.apply(message, timestamp);
     assertFalse(builder.flush().isPresent());
   }
 
@@ -641,7 +682,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testBadResultProfileExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(badResultProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -651,7 +694,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // due to the bad expression, there should be no result
-    builder.apply(message);
+    builder.apply(message, timestamp);
     assertFalse(builder.flush().isPresent());
   }
 
@@ -674,7 +717,9 @@ public class DefaultProfileBuilderTest {
 
   @Test
   public void testBadResultTriageExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(badResultTriage, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -684,7 +729,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // due to the bad expression, there should be no result
-    builder.apply(message);
+    builder.apply(message, timestamp);
     assertFalse(builder.flush().isPresent());
   }
 
@@ -707,7 +752,9 @@ public class DefaultProfileBuilderTest {
    */
   @Test
   public void testBadUpdateExpression() throws Exception {
+
     // setup
+    long timestamp = 100;
     definition = JSONUtils.INSTANCE.load(badUpdateProfile, ProfileConfig.class);
     builder = new DefaultProfileBuilder.Builder()
             .withDefinition(definition)
@@ -717,7 +764,7 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    builder.apply(message);
+    builder.apply(message, timestamp);
 
     // if the update expression fails, the profile should still flush.
     Optional<ProfileMeasurement> m = builder.flush();

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
index 3a51ea4..1a72111 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -20,7 +20,6 @@
 
 package org.apache.metron.profiler;
 
-import org.apache.metron.profiler.ProfilePeriod;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;