You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/16 12:26:38 UTC
metron git commit: METRON-1704 Message Timestamp Logic Should be
Shared (nickwallen) closes apache/metron#1146
Repository: metron
Updated Branches:
refs/heads/feature/METRON-1699-create-batch-profiler 4fb920167 -> 5eff97fbe
METRON-1704 Message Timestamp Logic Should be Shared (nickwallen) closes apache/metron#1146
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5eff97fb
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5eff97fb
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5eff97fb
Branch: refs/heads/feature/METRON-1699-create-batch-profiler
Commit: 5eff97fbe4a99b4d9fdec1010cfa3358cf182ddd
Parents: 4fb9201
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Aug 16 08:26:03 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Aug 16 08:26:03 2018 -0400
----------------------------------------------------------------------
.../profiler/DefaultMessageDistributor.java | 6 +-
.../metron/profiler/DefaultMessageRouter.java | 34 +++++++++--
.../metron/profiler/MessageDistributor.java | 4 +-
.../apache/metron/profiler/MessageRoute.java | 64 +++++++++++++++++++-
.../metron/profiler/StandAloneProfiler.java | 26 +++-----
.../profiler/DefaultMessageDistributorTest.java | 32 +++++-----
.../profiler/DefaultMessageRouterTest.java | 55 +++++++++++++++++
.../profiler/bolt/ProfileBuilderBolt.java | 4 +-
.../profiler/bolt/ProfileSplitterBolt.java | 37 ++++-------
.../profiler/bolt/ProfileBuilderBoltTest.java | 2 +-
.../profiler/bolt/ProfileSplitterBoltTest.java | 5 +-
11 files changed, 189 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index c926a70..d950b07 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -148,16 +148,14 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab
/**
* Distribute a message along a MessageRoute.
*
- * @param message The message that needs distributed.
- * @param timestamp The timestamp of the message.
* @param route The message route.
* @param context The Stellar execution context.
*/
@Override
- public void distribute(JSONObject message, long timestamp, MessageRoute route, Context context) {
+ public void distribute(MessageRoute route, Context context) {
try {
ProfileBuilder builder = getBuilder(route, context);
- builder.apply(message, timestamp);
+ builder.apply(route.getMessage(), route.getTimestamp());
} catch(ExecutionException e) {
LOG.error("Unexpected error", e);
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
index 19bfa8c..21ff2b1 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageRouter.java
@@ -22,6 +22,9 @@ package org.apache.metron.profiler;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.ClockFactory;
+import org.apache.metron.profiler.clock.DefaultClockFactory;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
@@ -54,10 +57,16 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
*/
private StellarStatefulExecutor executor;
+ /**
+ * Responsible for creating the {@link Clock}.
+ */
+ private ClockFactory clockFactory;
+
public DefaultMessageRouter(Context context) {
this.executor = new DefaultStellarStatefulExecutor();
StellarFunctions.initialize(context);
executor.setContext(context);
+ clockFactory = new DefaultClockFactory();
}
/**
@@ -74,7 +83,8 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
// attempt to route the message to each of the profiles
for (ProfileConfig profile: config.getProfiles()) {
- Optional<MessageRoute> route = routeToProfile(message, profile);
+ Clock clock = clockFactory.createClock(config);
+ Optional<MessageRoute> route = routeToProfile(message, profile, clock);
route.ifPresent(routes::add);
}
@@ -87,20 +97,24 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
* @param profile The profile that may need the message.
* @return A MessageRoute if the message is needed by the profile.
*/
- private Optional<MessageRoute> routeToProfile(JSONObject message, ProfileConfig profile) {
+ private Optional<MessageRoute> routeToProfile(JSONObject message, ProfileConfig profile, Clock clock) {
Optional<MessageRoute> route = Optional.empty();
// allow the profile to access the fields defined within the message
@SuppressWarnings("unchecked")
final Map<String, Object> state = (Map<String, Object>) message;
-
try {
// is this message needed by this profile?
if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
- // what is the name of the entity in this message?
- String entity = executor.execute(profile.getForeach(), state, String.class);
- route = Optional.of(new MessageRoute(profile, entity));
+ // what time is is? could be either system or event time
+ Optional<Long> timestamp = clock.currentTimeMillis(message);
+ if(timestamp.isPresent()) {
+
+ // what is the name of the entity in this message?
+ String entity = executor.execute(profile.getForeach(), state, String.class);
+ route = Optional.of(new MessageRoute(profile, entity, message, timestamp.get()));
+ }
}
} catch(Throwable e) {
@@ -111,4 +125,12 @@ public class DefaultMessageRouter implements MessageRouter, Serializable {
return route;
}
+
+ public void setExecutor(StellarStatefulExecutor executor) {
+ this.executor = executor;
+ }
+
+ public void setClockFactory(ClockFactory clockFactory) {
+ this.clockFactory = clockFactory;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
index ea5be0f..b164207 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageDistributor.java
@@ -43,12 +43,10 @@ public interface MessageDistributor {
/**
* Distribute a message along a {@link MessageRoute}.
*
- * @param message The message that needs distributed.
- * @param timestamp The timestamp of the message.
* @param route The message route.
* @param context The Stellar execution context.
*/
- void distribute(JSONObject message, long timestamp, MessageRoute route, Context context);
+ void distribute(MessageRoute route, Context context);
/**
* Flush all active profiles.
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
index 680e4e8..e76b897 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/MessageRoute.java
@@ -20,7 +20,11 @@
package org.apache.metron.profiler;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.json.simple.JSONObject;
import java.io.Serializable;
@@ -48,14 +52,26 @@ public class MessageRoute implements Serializable {
private String entity;
/**
+ * The message taking this route.
+ */
+ private JSONObject message;
+
+ /**
+ * The timestamp of the message.
+ */
+ private Long timestamp;
+
+ /**
* Create a {@link MessageRoute}.
*
* @param profileDefinition The profile definition.
- * @param entity The entity.
+ * @param entity The entity.
*/
- public MessageRoute(ProfileConfig profileDefinition, String entity) {
+ public MessageRoute(ProfileConfig profileDefinition, String entity, JSONObject message, Long timestamp) {
this.entity = entity;
this.profileDefinition = profileDefinition;
+ this.message = message;
+ this.timestamp = timestamp;
}
public String getEntity() {
@@ -73,4 +89,48 @@ public class MessageRoute implements Serializable {
public void setProfileDefinition(ProfileConfig profileDefinition) {
this.profileDefinition = profileDefinition;
}
+
+ public JSONObject getMessage() {
+ return message;
+ }
+
+ public void setMessage(JSONObject message) {
+ this.message = message;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MessageRoute that = (MessageRoute) o;
+ return new EqualsBuilder()
+ .append(profileDefinition, that.profileDefinition)
+ .append(entity, that.entity)
+ .append(message, that.message)
+ .append(timestamp, that.timestamp)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(profileDefinition)
+ .append(entity)
+ .append(message)
+ .append(timestamp)
+ .toHashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
index f79efe6..befa296 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/StandAloneProfiler.java
@@ -112,26 +112,14 @@ public class StandAloneProfiler {
* @param message The message to apply.
*/
public void apply(JSONObject message) {
-
- // what time is it?
- Clock clock = clockFactory.createClock(config);
- Optional<Long> timestamp = clock.currentTimeMillis(message);
-
- // can only route the message, if we have a timestamp
- if(timestamp.isPresent()) {
-
- // route the message to the correct profile builders
- List<MessageRoute> routes = router.route(message, config, context);
- for (MessageRoute route : routes) {
- distributor.distribute(message, timestamp.get(), route, context);
- }
-
- routeCount += routes.size();
- messageCount += 1;
-
- } else {
- LOG.warn("No timestamp available for the message. The message will be ignored.");
+ // route the message to the correct profile builders
+ List<MessageRoute> routes = router.route(message, config, context);
+ for (MessageRoute route : routes) {
+ distributor.distribute(route, context);
}
+
+ routeCount += routes.size();
+ messageCount += 1;
}
/**
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index ea9c5c6..48161e2 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -123,10 +123,10 @@ public class DefaultMessageDistributorTest {
long timestamp = 100;
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
- MessageRoute route = new MessageRoute(definition, entity);
+ MessageRoute route = new MessageRoute(definition, entity, messageOne, timestamp);
// distribute one message and flush
- distributor.distribute(messageOne, timestamp, route, context);
+ distributor.distribute(route, context);
List<ProfileMeasurement> measurements = distributor.flush();
// expect one measurement coming from one profile
@@ -144,12 +144,12 @@ public class DefaultMessageDistributorTest {
String entity = (String) messageOne.get("ip_src_addr");
// distribute one message to the first profile
- MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity);
- distributor.distribute(messageOne, timestamp, routeOne, context);
+ MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entity, messageOne, timestamp);
+ distributor.distribute(routeOne, context);
// distribute another message to the second profile, but same entity
- MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity);
- distributor.distribute(messageOne, timestamp, routeTwo, context);
+ MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entity, messageOne, timestamp);
+ distributor.distribute(routeTwo, context);
// expect 2 measurements; 1 for each profile
List<ProfileMeasurement> measurements = distributor.flush();
@@ -164,13 +164,13 @@ public class DefaultMessageDistributorTest {
// distribute one message
String entityOne = (String) messageOne.get("ip_src_addr");
- MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne);
- distributor.distribute(messageOne, timestamp, routeOne, context);
+ MessageRoute routeOne = new MessageRoute(createDefinition(profileOne), entityOne, messageOne, timestamp);
+ distributor.distribute(routeOne, context);
// distribute another message with a different entity
String entityTwo = (String) messageTwo.get("ip_src_addr");
- MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo);
- distributor.distribute(messageTwo, timestamp, routeTwo, context);
+ MessageRoute routeTwo = new MessageRoute(createDefinition(profileTwo), entityTwo, messageTwo, timestamp);
+ distributor.distribute(routeTwo, context);
// expect 2 measurements; 1 for each entity
List<ProfileMeasurement> measurements = distributor.flush();
@@ -190,7 +190,7 @@ public class DefaultMessageDistributorTest {
// setup
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
- MessageRoute route = new MessageRoute(definition, entity);
+ MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis());
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
@@ -198,7 +198,7 @@ public class DefaultMessageDistributorTest {
ticker);
// distribute one message
- distributor.distribute(messageOne, 1000000, route, context);
+ distributor.distribute(route, context);
// advance time to just shy of the profile TTL
ticker.advanceTime(profileTimeToLiveMillis - 1000, MILLISECONDS);
@@ -220,7 +220,7 @@ public class DefaultMessageDistributorTest {
// setup
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
- MessageRoute route = new MessageRoute(definition, entity);
+ MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis());
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
@@ -228,7 +228,7 @@ public class DefaultMessageDistributorTest {
ticker);
// distribute one message
- distributor.distribute(messageOne, 100000, route, context);
+ distributor.distribute(route, context);
// advance time to just beyond the period duration
ticker.advanceTime(profileTimeToLiveMillis + 1000, MILLISECONDS);
@@ -251,7 +251,7 @@ public class DefaultMessageDistributorTest {
// setup
ProfileConfig definition = createDefinition(profileOne);
String entity = (String) messageOne.get("ip_src_addr");
- MessageRoute route = new MessageRoute(definition, entity);
+ MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis());
distributor = new DefaultMessageDistributor(
periodDurationMillis,
profileTimeToLiveMillis,
@@ -259,7 +259,7 @@ public class DefaultMessageDistributorTest {
ticker);
// distribute one message
- distributor.distribute(messageOne, 1000000, route, context);
+ distributor.distribute(route, context);
// advance time a couple of hours
ticker.advanceTime(2, HOURS);
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
index 534f155..f583c30 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageRouterTest.java
@@ -57,6 +57,17 @@ public class DefaultMessageRouterTest {
/**
* {
+ * "ip_src_addr": "10.0.0.1",
+ * "value": "22",
+ * "timestamp": 1531250226659
+ * }
+ */
+ @Multiline
+ private String inputWithTimestamp;
+ private JSONObject messageWithTimestamp;
+
+ /**
+ * {
* "profiles": [ ]
* }
*/
@@ -175,6 +186,23 @@ public class DefaultMessageRouterTest {
@Multiline
private String goodAndBad;
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile-one",
+ * "foreach": "ip_src_addr",
+ * "init": { "x": "0" },
+ * "update": { "x": "x + 1" },
+ * "result": "x"
+ * }
+ * ],
+ * "timestampField": "timestamp"
+ * }
+ */
+ @Multiline
+ private String profileWithEventTime;
+
private DefaultMessageRouter router;
private Context context;
@@ -193,6 +221,7 @@ public class DefaultMessageRouterTest {
JSONParser parser = new JSONParser();
this.messageOne = (JSONObject) parser.parse(inputOne);
this.messageTwo = (JSONObject) parser.parse(inputTwo);
+ this.messageWithTimestamp = (JSONObject) parser.parse(inputWithTimestamp);
}
@Test
@@ -268,4 +297,30 @@ public class DefaultMessageRouterTest {
assertEquals("good-profile", route1.getProfileDefinition().getProfile());
assertEquals(messageOne.get("ip_src_addr"), route1.getEntity());
}
+
+ /**
+ *
+ */
+ @Test
+ public void testMessageWithTimestamp() throws Exception {
+ List<MessageRoute> routes = router.route(messageWithTimestamp, createConfig(profileWithEventTime), context);;
+
+ assertEquals(1, routes.size());
+ MessageRoute route1 = routes.get(0);
+ assertEquals("profile-one", route1.getProfileDefinition().getProfile());
+ assertEquals(messageWithTimestamp.get("ip_src_addr"), route1.getEntity());
+ assertEquals(messageWithTimestamp.get("timestamp"), route1.getTimestamp());
+ }
+
+ /**
+ * If the timestamp of a message cannot be determined, it should not be routed.
+ *
+ * <p>This might happen when using event time and the message is missing the timestamp field.
+ */
+ @Test
+ public void testMessageWithMissingTimestamp() throws Exception {
+ // messageOne does not contain a timestamp
+ List<MessageRoute> routes = router.route(messageOne, createConfig(profileWithEventTime), context);
+ assertEquals(0, routes.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 0d1f27e..f9c0edd 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -363,9 +363,9 @@ public class ProfileBuilderBolt extends BaseWindowedBolt implements Reloadable {
activeFlushSignal.update(timestamp);
// distribute the message
- MessageRoute route = new MessageRoute(definition, entity);
+ MessageRoute route = new MessageRoute(definition, entity, message, timestamp);
synchronized (messageDistributor) {
- messageDistributor.distribute(message, timestamp, route, getStellarContext());
+ messageDistributor.distribute(route, getStellarContext());
}
LOG.debug("Message distributed: profile={}, entity={}, timestamp={}", definition.getProfile(), entity, timestamp);
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index f28411f..f57deb7 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -97,11 +97,6 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
private transient MessageRouter router;
/**
- * Responsible for creating the {@link Clock}.
- */
- private transient ClockFactory clockFactory;
-
- /**
* @param zookeeperUrl The Zookeeper URL that contains the configuration for this bolt.
*/
public ProfileSplitterBolt(String zookeeperUrl) {
@@ -114,10 +109,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
this.collector = collector;
this.parser = new JSONParser();
this.router = new DefaultMessageRouter(getStellarContext());
- this.clockFactory = new DefaultClockFactory();
}
- private Context getStellarContext() {
+ public Context getStellarContext() {
Map<String, Object> global = getConfigurations().getGlobalConfig();
return new Context.Builder()
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
@@ -162,15 +156,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
// ensure there is a valid profiler configuration
ProfilerConfig config = getProfilerConfig();
if(config != null && config.getProfiles().size() > 0) {
+ routeMessage(input, message, config);
- // what time is it?
- Clock clock = clockFactory.createClock(config);
- Optional<Long> timestamp = clock.currentTimeMillis(message);
-
- // route the message. if a message does not contain the timestamp field, it cannot be routed.
- timestamp.ifPresent(ts -> routeMessage(input, message, config, ts));
-
- } else {
+ } else if(LOG.isDebugEnabled()) {
LOG.debug("No Profiler configuration found. Nothing to do.");
}
}
@@ -180,24 +168,23 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
* @param input The input tuple on which to anchor.
* @param message The telemetry message.
* @param config The Profiler configuration.
- * @param timestamp The timestamp of the telemetry message.
*/
- private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config, Long timestamp) {
+ private void routeMessage(Tuple input, JSONObject message, ProfilerConfig config) {
// emit a tuple for each 'route'
List<MessageRoute> routes = router.route(message, config, getStellarContext());
for (MessageRoute route : routes) {
- Values values = createValues(message, timestamp, route);
+ Values values = createValues(route);
collector.emit(input, values);
LOG.debug("Found route for message; profile={}, entity={}, timestamp={}",
route.getProfileDefinition().getProfile(),
route.getEntity(),
- timestamp);
+ route.getTimestamp());
}
- LOG.debug("Found {} route(s) for message with timestamp={}", routes.size(), timestamp);
+ LOG.debug("Found {} route(s) for message", routes.size());
}
/**
@@ -222,22 +209,20 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
/**
* Creates the {@link Values} attached to the outgoing tuple.
*
- * @param message The telemetry message.
- * @param timestamp The timestamp of the message.
* @param route The route the message must take.
* @return
*/
- private Values createValues(JSONObject message, Long timestamp, MessageRoute route) {
+ private Values createValues(MessageRoute route) {
// the order here must match `declareOutputFields`
- return new Values(message, timestamp, route.getEntity(), route.getProfileDefinition());
+ return new Values(route.getMessage(), route.getTimestamp(), route.getEntity(), route.getProfileDefinition());
}
protected MessageRouter getMessageRouter() {
return router;
}
- public void setClockFactory(ClockFactory clockFactory) {
- this.clockFactory = clockFactory;
+ public void setRouter(MessageRouter router) {
+ this.router = router;
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 3d009fb..3132ae6 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -127,7 +127,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
bolt.execute(tupleWindow);
// the message should have been extracted from the tuple and passed to the MessageDistributor
- verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any());
+ verify(distributor).distribute(any(MessageRoute.class), any());
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5eff97fb/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index bf81923..d57e825 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -23,6 +23,7 @@ package org.apache.metron.profiler.bolt;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.DefaultMessageRouter;
import org.apache.metron.profiler.clock.FixedClockFactory;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.test.bolt.BaseBoltTest;
@@ -428,7 +429,9 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
// set the clock factory AFTER calling prepare to use the fixed clock factory
- bolt.setClockFactory(new FixedClockFactory(timestamp));
+ DefaultMessageRouter router = new DefaultMessageRouter(bolt.getStellarContext());
+ router.setClockFactory(new FixedClockFactory(timestamp));
+ bolt.setRouter(router);
return bolt;
}