You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/16 12:26:38 UTC

metron git commit: METRON-1704 Message Timestamp Logic Should be Shared (nickwallen) closes apache/metron#1146

Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1699-create-batch-profiler 4fb920167 -> 5eff97fbe


METRON-1704 Message Timestamp Logic Should be Shared (nickwallen) closes apache/metron#1146


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5eff97fb
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5eff97fb
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5eff97fb

Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 5eff97fbe4a99b4d9fdec1010cfa3358cf182ddd
Parents: 4fb9201
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Aug 16 08:26:03 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Aug 16 08:26:03 2018 -0400

----------------------------------------------------------------------
 .../profiler/DefaultMessageDistributor.java     |  6 +-
 .../metron/profiler/DefaultMessageRouter.java   | 34 +++++++++--
 .../metron/profiler/MessageDistributor.java     |  4 +-
 .../apache/metron/profiler/MessageRoute.java    | 64 +++++++++++++++++++-
 .../metron/profiler/StandAloneProfiler.java     | 26 +++-----
 .../profiler/DefaultMessageDistributorTest.java | 32 +++++-----
 .../profiler/DefaultMessageRouterTest.java      | 55 +++++++++++++++++
 .../profiler/bolt/ProfileBuilderBolt.java       |  4 +-
 .../profiler/bolt/ProfileSplitterBolt.java      | 37 ++++-------
 .../profiler/bolt/ProfileBuilderBoltTest.java   |  2 +-
 .../profiler/bolt/ProfileSplitterBoltTest.java  |  5 +-
 11 files changed, 189 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index c926a70..d950b07 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -148,16 +148,14 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
   /**
    * Distribute a message along a MessageRoute.
    *
-   * @param message The message that needs distributed.
-   * @param timestamp The timestamp of the message.
    * @param route The message route.
    * @param context The Stellar execution context.
    */
   @Override
-  public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) {
+  public void distribute(MessageRoute route, Context context) {
     try {
       ProfileBuilder builder = getBuilder(route, context);
-      builder.apply(message, timestamp);
+      builder.apply(route.getMessage(), route.getTimestamp());
 
     } catch(ExecutionException e) {
       LOG.error("Unexpected error", e);

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
index 19bfa8c..21ff2b1 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
@@ -22,6 +22,9 @@ package org.apache.metron.profiler;
 
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.DefaultClockFactory;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
@@ -54,10 +57,16 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
    */
   private StellarStatefulExecutor executor;
 
+  /**
+   * Responsible for creating the {@link Clock}.
+   */
+  private ClockFactory clockFactory;
+
   public DefaultMessageRouter(Context context) {
     this.executor = new DefaultStellarStatefulExecutor();
     StellarFunctions.initialize(context);
     executor.setContext(context);
+    clockFactory = new DefaultClockFactory();
   }
 
   /**
@@ -74,7 +83,8 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
 
     // attempt to route the message to each of the profiles
     for (ProfileConfig profile: config.getProfiles()) {
-      Optional<MessageRoute> route = routeToProfile(message, profile);
+      Clock clock = clockFactory.createClock(config);
+      Optional<MessageRoute> route = routeToProfile(message, profile, clock);
       route.ifPresent(routes::add);
     }
 
@@ -87,20 +97,24 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
    * @param profile The profile that may need the message.
    * @return A MessageRoute if the message is needed by the profile.
    */
-  private Optional<MessageRoute> routeToProfile(JSONObject message, ProfileConfig profile) {
+  private Optional<MessageRoute> routeToProfile(JSONObject message, ProfileConfig profile, Clock clock) {
     Optional<MessageRoute> route = Optional.empty();
 
     // allow the profile to access the fields defined within the message
     @SuppressWarnings("unchecked")
     final Map<String, Object> state = (Map<String, Object>) message;
-
     try {
       // is this message needed by this profile?
       if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
 
-        // what is the name of the entity in this message?
-        String entity = executor.execute(profile.getForeach(), state, String.class);
-        route = Optional.of(new MessageRoute(profile, entity));
+        // what time is is? could be either system or event time
+        Optional<Long> timestamp = clock.currentTimeMillis(message);
+        if(timestamp.isPresent()) {
+
+          // what is the name of the entity in this message?
+          String entity = executor.execute(profile.getForeach(), state, String.class);
+          route = Optional.of(new MessageRoute(profile, entity, message, timestamp.get()));
+        }
       }
 
     } catch(Throwable e) {
@@ -111,4 +125,12 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
 
     return route;
   }
+
+  public void setExecutor(StellarStatefulExecutor executor) {
+    this.executor = executor;
+  }
+
+  public void setClockFactory(ClockFactory clockFactory) {
+    this.clockFactory = clockFactory;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
index ea5be0f..b164207 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
@@ -43,12 +43,10 @@ public interface MessageDistributor {
   /**
    * Distribute a message along a {@link MessageRoute}.
    *
-   * @param message The message that needs distributed.
-   * @param timestamp The timestamp of the message.
    * @param route The message route.
    * @param context The Stellar execution context.
    */
-  void distribute(JSONObject message, long timestamp, MessageRoute route, Context context);
+  void distribute(MessageRoute route, Context context);
 
   /**
    * Flush all active profiles.

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
index 680e4e8..e76b897 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -20,7 +20,11 @@
 
 package org.apache.metron.profiler;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.json.simple.JSONObject;
 
 import java.io.Serializable;
 
@@ -48,14 +52,26 @@ public class MessageRoute implements Serializable {
   private String entity;
 
   /**
+   * The message taking this route.
+   */
+  private JSONObject message;
+
+  /**
+   * The timestamp of the message.
+   */
+  private Long timestamp;
+
+  /**
    * Create a {@link MessageRoute}.
    *
    * @param profileDefinition The profile definition.
-   * @param entity The entity.
+   * @param entity            The entity.
    */
-  public MessageRoute(ProfileConfig profileDefinition, String entity) {
+  public MessageRoute(ProfileConfig profileDefinition, String entity, JSONObject message, Long timestamp) {
     this.entity = entity;
     this.profileDefinition = profileDefinition;
+    this.message = message;
+    this.timestamp = timestamp;
   }
 
   public String getEntity() {
@@ -73,4 +89,48 @@ public class MessageRoute implements Serializable {
   public void setProfileDefinition(ProfileConfig profileDefinition) {
     this.profileDefinition = profileDefinition;
   }
+
+  public JSONObject getMessage() {
+    return message;
+  }
+
+  public void setMessage(JSONObject message) {
+    this.message = message;
+  }
+
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    MessageRoute that = (MessageRoute) o;
+    return new EqualsBuilder()
+            .append(profileDefinition, that.profileDefinition)
+            .append(entity, that.entity)
+            .append(message, that.message)
+            .append(timestamp, that.timestamp)
+            .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+            .append(profileDefinition)
+            .append(entity)
+            .append(message)
+            .append(timestamp)
+            .toHashCode();
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
index f79efe6..befa296 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
@@ -112,26 +112,14 @@ public class StandAloneProfiler {
    * @param message The message to apply.
    */
   public void apply(JSONObject message) {
-
-    // what time is it?
-    Clock clock = clockFactory.createClock(config);
-    Optional<Long> timestamp = clock.currentTimeMillis(message);
-
-    // can only route the message, if we have a timestamp
-    if(timestamp.isPresent()) {
-
-      // route the message to the correct profile builders
-      List<MessageRoute> routes = router.route(message, config, context);
-      for (MessageRoute route : routes) {
-        distributor.distribute(message, timestamp.get(), route, context);
-      }
-
-      routeCount += routes.size();
-      messageCount += 1;
-
-    } else {
-      LOG.warn("No timestamp available for the message. The message will be ignored.");
+    // route the message to the correct profile builders
+    List<MessageRoute> routes = router.route(message, config, context);
+    for (MessageRoute route : routes) {
+      distributor.distribute(route, context);
     }
+
+    routeCount += routes.size();
+    messageCount += 1;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index ea9c5c6..48161e2 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -123,10 +123,10 @@ public class DefaultMessageDistributorTest {
     long timestamp = 100;
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity);
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, timestamp);
 
     // distribute one message and flush
-    distributor.distribute(messageOne, timestamp, route, context);
+    distributor.distribute(route, context);
     List<ProfileMeasurement> measurements = distributor.flush();
 
     // expect one measurement coming from one profile
@@ -144,12 +144,12 @@ public class DefaultMessageDistributorTest {
     String entity = (String) messageOne.get("ip_src_addr");
 
     // distribute one message to the first profile
-    MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity);
-    distributor.distribute(messageOne, timestamp, routeOne, context);
+    MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity, messageOne, timestamp);
+    distributor.distribute(routeOne, context);
 
     // distribute another message to the second profile, but same entity
-    MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity);
-    distributor.distribute(messageOne, timestamp, routeTwo, context);
+    MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity, messageOne, timestamp);
+    distributor.distribute(routeTwo, context);
 
     // expect 2 measurements; 1 for each profile
     List<ProfileMeasurement> measurements = distributor.flush();
@@ -164,13 +164,13 @@ public class DefaultMessageDistributorTest {
 
     // distribute one message
     String entityOne = (String) messageOne.get("ip_src_addr");
-    MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne);
-    distributor.distribute(messageOne, timestamp, routeOne, context);
+    MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne, messageOne, timestamp);
+    distributor.distribute(routeOne, context);
 
     // distribute another message with a different entity
     String entityTwo = (String) messageTwo.get("ip_src_addr");
-    MessageRoute routeTwo =  new MessageRoute(createDefinition(profileTwo), entityTwo);
-    distributor.distribute(messageTwo, timestamp, routeTwo, context);
+    MessageRoute routeTwo =  new MessageRoute(createDefinition(profileTwo), entityTwo, messageTwo, timestamp);
+    distributor.distribute(routeTwo, context);
 
     // expect 2 measurements; 1 for each entity
     List<ProfileMeasurement> measurements = distributor.flush();
@@ -190,7 +190,7 @@ public class DefaultMessageDistributorTest {
     // setup
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity);
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis());
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
@@ -198,7 +198,7 @@ public class DefaultMessageDistributorTest {
             ticker);
 
     // distribute one message
-    distributor.distribute(messageOne, 1000000, route, context);
+    distributor.distribute(route, context);
 
     // advance time to just shy of the profile TTL
     ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS);
@@ -220,7 +220,7 @@ public class DefaultMessageDistributorTest {
     // setup
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity);
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis());
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
@@ -228,7 +228,7 @@ public class DefaultMessageDistributorTest {
             ticker);
 
     // distribute one message
-    distributor.distribute(messageOne, 100000, route, context);
+    distributor.distribute(route, context);
 
     // advance time to just beyond the period duration
     ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS);
@@ -251,7 +251,7 @@ public class DefaultMessageDistributorTest {
     // setup
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity);
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis());
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
@@ -259,7 +259,7 @@ public class DefaultMessageDistributorTest {
             ticker);
 
     // distribute one message
-    distributor.distribute(messageOne, 1000000, route, context);
+    distributor.distribute(route, context);
 
     // advance time a couple of hours
     ticker.advanceTime(2, HOURS);

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
index 534f155..f583c30 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
@@ -57,6 +57,17 @@ public class DefaultMessageRouterTest {
 
   /**
    * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "value": "22",
+   *   "timestamp": 1531250226659
+   * }
+   */
+  @Multiline
+  private String inputWithTimestamp;
+  private JSONObject messageWithTimestamp;
+
+  /**
+   * {
    *   "profiles": [ ]
    * }
    */
@@ -175,6 +186,23 @@ public class DefaultMessageRouterTest {
   @Multiline
   private String goodAndBad;
 
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile-one",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "x": "0" },
+   *        "update": { "x": "x + 1" },
+   *        "result": "x"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp"
+   * }
+   */
+  @Multiline
+  private String profileWithEventTime;
+
   private DefaultMessageRouter router;
   private Context context;
 
@@ -193,6 +221,7 @@ public class DefaultMessageRouterTest {
     JSONParser parser = new JSONParser();
     this.messageOne = (JSONObject) parser.parse(inputOne);
     this.messageTwo = (JSONObject) parser.parse(inputTwo);
+    this.messageWithTimestamp = (JSONObject) parser.parse(inputWithTimestamp);
   }
 
   @Test
@@ -268,4 +297,30 @@ public class DefaultMessageRouterTest {
     assertEquals("good-profile", route1.getProfileDefinition().getProfile());
     assertEquals(messageOne.get("ip_src_addr"), route1.getEntity());
   }
+
+  /**
+   *
+   */
+  @Test
+  public void testMessageWithTimestamp() throws Exception {
+    List<MessageRoute> routes = router.route(messageWithTimestamp, createConfig(profileWithEventTime), context);;
+
+    assertEquals(1, routes.size());
+    MessageRoute route1 = routes.get(0);
+    assertEquals("profile-one", route1.getProfileDefinition().getProfile());
+    assertEquals(messageWithTimestamp.get("ip_src_addr"), route1.getEntity());
+    assertEquals(messageWithTimestamp.get("timestamp"), route1.getTimestamp());
+  }
+
+  /**
+   * If the timestamp of a message cannot be determined, it should not be routed.
+   *
+   * <p>This might happen when using event time and the message is missing the timestamp field.
+   */
+  @Test
+  public void testMessageWithMissingTimestamp() throws Exception {
+    // messageOne does not contain a timestamp
+    List<MessageRoute> routes = router.route(messageOne, createConfig(profileWithEventTime), context);
+    assertEquals(0, routes.size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 0d1f27e..f9c0edd 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -363,9 +363,9 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
     activeFlushSignal.update(timestamp);
     
     // distribute the message
-    MessageRoute route = new MessageRoute(definition, entity);
+    MessageRoute route = new MessageRoute(definition, entity, message, timestamp);
     synchronized (messageDistributor) {
-      messageDistributor.distribute(message, timestamp, route, getStellarContext());
+      messageDistributor.distribute(route, getStellarContext());
     }
 
     LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp);

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index f28411f..f57deb7 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -97,11 +97,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   private transient MessageRouter router;
 
   /**
-   * Responsible for creating the {@link Clock}.
-   */
-  private transient ClockFactory clockFactory;
-
-  /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt.
    */
   public ProfileSplitterBolt(String zookeeperUrl) {
@@ -114,10 +109,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     this.collector = collector;
     this.parser = new JSONParser();
     this.router = new DefaultMessageRouter(getStellarContext());
-    this.clockFactory = new DefaultClockFactory();
   }
 
-  private Context getStellarContext() {
+  public Context getStellarContext() {
     Map<String, Object> global = getConfigurations().getGlobalConfig();
     return new Context.Builder()
             .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
@@ -162,15 +156,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     // ensure there is a valid profiler configuration
     ProfilerConfig config = getProfilerConfig();
     if(config != null && config.getProfiles().size() > 0) {
+      routeMessage(input, message, config);
 
-      // what time is it?
-      Clock clock = clockFactory.createClock(config);
-      Optional<Long> timestamp = clock.currentTimeMillis(message);
-
-      // route the message.  if a message does not contain the timestamp field, it cannot be routed.
-      timestamp.ifPresent(ts -> routeMessage(input, message, config, ts));
-
-    } else {
+    } else if(LOG.isDebugEnabled()) {
       LOG.debug("No Profiler configuration found.  Nothing to do.");
     }
   }
@@ -180,24 +168,23 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
    * @param input The input tuple on which to anchor.
    * @param message The telemetry message.
    * @param config The Profiler configuration.
-   * @param timestamp The timestamp of the telemetry message.
    */
-  private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config, Long timestamp) {
+  private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config) {
 
     // emit a tuple for each 'route'
     List<MessageRoute> routes = router.route(message, config, getStellarContext());
     for (MessageRoute route : routes) {
 
-      Values values = createValues(message, timestamp, route);
+      Values values = createValues(route);
       collector.emit(input, values);
 
       LOG.debug("Found route for message; profile={}, entity={}, timestamp={}",
               route.getProfileDefinition().getProfile(),
               route.getEntity(),
-              timestamp);
+              route.getTimestamp());
     }
 
-    LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp);
+    LOG.debug("Found {} route(s) for message", routes.size());
   }
 
   /**
@@ -222,22 +209,20 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
   /**
    * Creates the {@link Values} attached to the outgoing tuple.
    *
-   * @param message The telemetry message.
-   * @param timestamp The timestamp of the message.
    * @param route The route the message must take.
    * @return
    */
-  private Values createValues(JSONObject message, Long timestamp, MessageRoute route) {
+  private Values createValues(MessageRoute route) {
 
     // the order here must match `declareOutputFields`
-    return new Values(message, timestamp, route.getEntity(), route.getProfileDefinition());
+    return new Values(route.getMessage(), route.getTimestamp(), route.getEntity(), route.getProfileDefinition());
   }
 
   protected MessageRouter getMessageRouter() {
     return router;
   }
 
-  public void setClockFactory(ClockFactory clockFactory) {
-    this.clockFactory = clockFactory;
+  public void setRouter(MessageRouter router) {
+    this.router = router;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 3d009fb..3132ae6 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -127,7 +127,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     bolt.execute(tupleWindow);
 
     // the message should have been extracted from the tuple and passed to the MessageDistributor
-    verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any());
+    verify(distributor).distribute(any(MessageRoute.class), any());
   }
 
 

http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index bf81923..d57e825 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -23,6 +23,7 @@ package org.apache.metron.profiler.bolt;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.DefaultMessageRouter;
 import org.apache.metron.profiler.clock.FixedClockFactory;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.test.bolt.BaseBoltTest;
@@ -428,7 +429,9 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
     bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 
     // set the clock factory AFTER calling prepare to use the fixed clock factory
-    bolt.setClockFactory(new FixedClockFactory(timestamp));
+    DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext());
+    router.setClockFactory(new FixedClockFactory(timestamp));
+    bolt.setRouter(router);
 
     return bolt;
   }