You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/12/05 19:26:16 UTC
[2/2] incubator-metron git commit: METRON-606 Profiler Overwriting
Previously Written Values (nickwallen) closes apache/incubator-metron#387
METRON-606 Profiler Overwriting Previously Written Values (nickwallen) closes apache/incubator-metron#387
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/670f50ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/670f50ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/670f50ae
Branch: refs/heads/master
Commit: 670f50ae3b949502e660d9e15a10a1da6a0c6fd4
Parents: 1734f5f
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Dec 5 14:25:41 2016 -0500
Committer: Nick Allen <ni...@nickallen.org>
Committed: Mon Dec 5 14:25:41 2016 -0500
----------------------------------------------------------------------
.../metron/profiler/client/GetProfileTest.java | 21 +-
.../client/HBaseProfilerClientTest.java | 31 +-
.../metron/profiler/client/ProfileWriter.java | 26 +-
.../apache/metron/profiler/ProfileBuilder.java | 335 ++++++++++++++++
.../metron/profiler/ProfileMeasurement.java | 41 +-
.../apache/metron/profiler/ProfilePeriod.java | 7 +
.../org/apache/metron/profiler/clock/Clock.java | 35 ++
.../metron/profiler/clock/FixedClock.java | 40 ++
.../apache/metron/profiler/clock/WallClock.java | 34 ++
.../stellar/DefaultStellarExecutor.java | 11 +-
.../profiler/stellar/StellarExecutor.java | 7 +
.../metron/profiler/ProfileBuilderTest.java | 383 ++++++++++++++++++
.../metron/profiler/ProfilePeriodTest.java | 7 +
.../profiler/hbase/SaltyRowKeyBuilderTest.java | 34 +-
.../stellar/DefaultStellarExecutorTest.java | 8 +
.../profiler/bolt/ProfileBuilderBolt.java | 327 +++-------------
.../profiler/bolt/ProfileHBaseMapper.java | 10 +-
.../metron/profiler/bolt/ProfileState.java | 101 -----
.../profiler/bolt/ProfileBuilderBoltTest.java | 385 ++++++++-----------
.../profiler/bolt/ProfileHBaseMapperTest.java | 7 +-
20 files changed, 1171 insertions(+), 679 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
index ce9965f..95582e7 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
@@ -129,7 +129,11 @@ public class GetProfileTest {
// setup - write some measurements to be read later
final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
+
profileWriter.write(m, count, group, val -> expectedValue);
// execute - read the profile values - no groups
@@ -153,7 +157,10 @@ public class GetProfileTest {
// setup - write some measurements to be read later
final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, count, group, val -> expectedValue);
// create a variable that contains the groups to use
@@ -180,7 +187,10 @@ public class GetProfileTest {
// setup - write some measurements to be read later
final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, count, group, val -> expectedValue);
// create a variable that contains the groups to use
@@ -224,7 +234,10 @@ public class GetProfileTest {
final List<Object> group = Collections.emptyList();
// setup - write a single value from 2 hours ago
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, 1, group, val -> expectedValue);
// create a variable that contains the groups to use
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 0076396..ed75a65 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -113,7 +113,11 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
// setup - write two groups of measurements - 'weekends' and 'weekdays'
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
+
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
@@ -137,7 +141,10 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
// create two groups of measurements - one on weekdays and one on weekends
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekends"), val -> 0);
@@ -160,7 +167,10 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
// setup - write some values to read later
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, hours * periodsPerHour, group, val -> 1000);
// execute
@@ -183,7 +193,10 @@ public class HBaseProfilerClientTest {
final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
// setup - write two groups of measurements - 'weekends' and 'weekdays'
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
@@ -210,7 +223,10 @@ public class HBaseProfilerClientTest {
final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
// create two groups of measurements - one on weekdays and one on weekends
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
@@ -235,7 +251,10 @@ public class HBaseProfilerClientTest {
final long measurementTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
// setup - write some values to read later
- ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", measurementTime, periodDuration, periodUnits);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(measurementTime, periodDuration, periodUnits);
profileWriter.write(m, numberToWrite, group, val -> 1000);
// execute
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index a4ea8af..6e2b11e 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -70,21 +70,18 @@ public class ProfileWriter {
ProfileMeasurement m = prototype;
for(int i=0; i<count; i++) {
- // create a measurement for the next profile period to be written
- ProfilePeriod next = m.getPeriod().next();
- m = new ProfileMeasurement(
- prototype.getProfileName(),
- prototype.getEntity(),
- next.getStartTimeMillis(),
- prototype.getPeriod().getDurationMillis(),
- TimeUnit.MILLISECONDS);
-
// generate the next value that should be written
Object nextValue = valueGenerator.apply(m.getValue());
- m.setValue(nextValue);
- m.setGroups(group);
- // write the measurement
+ // create a measurement for the next profile period to be written
+ ProfilePeriod next = m.getPeriod().next();
+ m = new ProfileMeasurement()
+ .withProfileName(prototype.getProfileName())
+ .withEntity(prototype.getEntity())
+ .withPeriod(next.getStartTimeMillis(), prototype.getPeriod().getDurationMillis(), TimeUnit.MILLISECONDS)
+ .withGroups(group)
+ .withValue(nextValue);
+
write(m);
}
}
@@ -115,7 +112,10 @@ public class ProfileWriter {
HTableInterface table = provider.getTable(config, "profiler");
long when = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
- ProfileMeasurement measure = new ProfileMeasurement("profile1", "192.168.66.121", when, 15, TimeUnit.MINUTES);
+ ProfileMeasurement measure = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("192.168.66.121")
+ .withPeriod(when, 15, TimeUnit.MINUTES);
ProfileWriter writer = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
writer.write(measure, 2 * 24 * 4, Collections.emptyList(), val -> new Random().nextInt(10));
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
new file mode 100644
index 0000000..2f1bc93
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -0,0 +1,335 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.WallClock;
+import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
+import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
+/**
+ * Responsible for building and maintaining a Profile.
+ *
+ * One or more messages are applied to the Profile with `apply` and a profile measurement is
+ * produced by calling `flush`.
+ *
+ * Any one instance is responsible only for building the profile for a specific [profile, entity]
+ * pairing. There will exist many instances, one for each [profile, entity] pair that exists
+ * within the incoming telemetry data applied to the profile.
+ */
+public class ProfileBuilder implements Serializable {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(ProfileBuilder.class);
+
+ /**
+ * The name of the profile.
+ */
+ private String profileName;
+
+ /**
+ * The name of the entity.
+ */
+ private String entity;
+
+ /**
+ * The definition of the Profile that the bolt is building.
+ */
+ private ProfileConfig definition;
+
+ /**
+ * Executes Stellar code and maintains state across multiple invocations.
+ */
+ private StellarExecutor executor;
+
+ /**
+ * Has the profile been initialized?
+ */
+ private boolean isInitialized;
+
+ /**
+ * The duration of each period in milliseconds.
+ */
+ private long periodDurationMillis;
+
+ /**
+ * A clock is used to tell time; imagine that.
+ */
+ private Clock clock;
+
+ /**
+ * Use the ProfileBuilder.Builder to create a new ProfileBuilder.
+ */
+ private ProfileBuilder(ProfileConfig definition,
+ String entity,
+ Clock clock,
+ long periodDurationMillis,
+ CuratorFramework client,
+ Map<String, Object> global) {
+
+ this.isInitialized = false;
+ this.definition = definition;
+ this.profileName = definition.getProfile();
+ this.entity = entity;
+ this.clock = clock;
+ this.periodDurationMillis = periodDurationMillis;
+ this.executor = new DefaultStellarExecutor();
+ Context context = new Context.Builder()
+ .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+ .build();
+ StellarFunctions.initialize(context);
+ this.executor.setContext(context);
+ }
+
+ /**
+ * Apply a message to the profile.
+ * @param message The message to apply.
+ */
+ public void apply(JSONObject message) {
+
+ if(!isInitialized()) {
+ assign(definition.getInit(), message, "init");
+ isInitialized = true;
+ }
+
+ assign(definition.getUpdate(), message, "update");
+ }
+
+ /**
+ * Flush the Profile.
+ *
+ * Completes and emits the ProfileMeasurement. Clears all state in preparation for
+ * the next window period.
+ * @return Returns the completed profile measurement.
+ */
+ public ProfileMeasurement flush() {
+ LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
+
+ // execute the 'result' expression
+ Object value = execute(definition.getResult(), new JSONObject(), "result");
+
+ // execute the 'groupBy' expression(s) - can refer to value of 'result' expression
+ List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", value), "groupBy");
+
+ // execute the 'tickUpdate' expression(s) - can refer to value of 'result' expression
+ assign(definition.getTickUpdate(), ImmutableMap.of("result", value),"tickUpdate");
+
+ // save a copy of current state then clear it to prepare for the next window
+ Map<String, Object> state = executor.getState();
+ executor.clearState();
+
+ // the 'tickUpdate' state is not flushed - make sure to bring that state along to the next period
+ definition.getTickUpdate().forEach((var, expr) -> {
+ Object val = state.get(var);
+ executor.assign(var, val);
+ });
+
+ isInitialized = false;
+
+ return new ProfileMeasurement()
+ .withProfileName(profileName)
+ .withEntity(entity)
+ .withGroups(groups)
+ .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS)
+ .withValue(value);
+ }
+
+ /**
+ * Executes an expression contained within the profile definition.
+ * @param expression The expression to execute.
+ * @param transientState Additional transient state provided to the expression.
+ * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
+ * @return The result of executing the expression.
+ */
+ private Object execute(String expression, Map<String, Object> transientState, String expressionType) {
+ Object result = null;
+
+ List<Object> allResults = execute(Collections.singletonList(expression), transientState, expressionType);
+ if(allResults.size() > 0) {
+ result = allResults.get(0);
+ }
+
+ return result;
+ }
+
+ /**
+ * Executes a set of expressions whose results need to be assigned to a variable.
+ * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
+ * @param transientState Additional transient state provided to the expression.
+ * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
+ */
+ private void assign(Map<String, String> expressions, Map<String, Object> transientState, String expressionType) {
+ try {
+
+ // execute each of the 'update' expressions
+ MapUtils.emptyIfNull(expressions)
+ .forEach((var, expr) -> executor.assign(var, expr, transientState));
+
+ } catch(ParseException e) {
+
+ // make it brilliantly clear that one of the 'update' expressions is bad
+ String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
+ throw new ParseException(msg, e);
+ }
+ }
+
+ /**
+ * Executes the expressions contained within the profile definition.
+ * @param expressions A list of expressions to execute.
+ * @param transientState Additional transient state provided to the expressions.
+ * @param expressionType The type of expression; init, update, result. Provides additional context if expression execution fails.
+ * @return The result of executing each expression.
+ */
+ private List<Object> execute(List<String> expressions, Map<String, Object> transientState, String expressionType) {
+ List<Object> results = new ArrayList<>();
+
+ try {
+ ListUtils.emptyIfNull(expressions)
+ .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class)));
+
+ } catch (Throwable e) {
+ String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
+ throw new ParseException(msg, e);
+ }
+
+ return results;
+ }
+
+ /**
+ * Returns the current value of a variable.
+ * @param variable The name of the variable.
+ */
+ public Object valueOf(String variable) {
+ return executor.getState().get(variable);
+ }
+
+ public boolean isInitialized() {
+ return isInitialized;
+ }
+
+ public ProfileConfig getDefinition() {
+ return definition;
+ }
+
+ /**
+ * A builder used to construct a new ProfileBuilder.
+ */
+ public static class Builder {
+
+ private ProfileConfig definition;
+ private String entity;
+ private long periodDurationMillis;
+ private CuratorFramework zookeeperClient;
+ private Map<String, Object> global;
+ private Clock clock = new WallClock();
+
+ public Builder withClock(Clock clock) {
+ this.clock = clock;
+ return this;
+ }
+
+ /**
+ * @param definition The profiler definition.
+ */
+ public Builder withDefinition(ProfileConfig definition) {
+ this.definition = definition;
+ return this;
+ }
+
+ /**
+ * @param entity The name of the entity
+ */
+ public Builder withEntity(String entity) {
+ this.entity = entity;
+ return this;
+ }
+
+ /**
+ * @param duration The duration of each profile period.
+ * @param units The units used to specify the duration of the profile period.
+ */
+ public Builder withPeriodDuration(long duration, TimeUnit units) {
+ this.periodDurationMillis = units.toMillis(duration);
+ return this;
+ }
+
+ /**
+ * @param millis The duration of each profile period in milliseconds.
+ */
+ public Builder withPeriodDurationMillis(long millis) {
+ this.periodDurationMillis = millis;
+ return this;
+ }
+
+ /**
+ * @param zookeeperClient The zookeeper client.
+ */
+ public Builder withZookeeperClient(CuratorFramework zookeeperClient) {
+ this.zookeeperClient = zookeeperClient;
+ return this;
+ }
+
+ /**
+ * @param global The global configuration.
+ */
+ public Builder withGlobalConfiguration(Map<String, Object> global) {
+ // TODO how does the profile builder ever seen a global that has been update in zookeeper?
+ this.global = global;
+ return this;
+ }
+
+ /**
+ * Construct a ProfileBuilder.
+ */
+ public ProfileBuilder build() {
+
+ if(definition == null) {
+ throw new IllegalArgumentException("missing profiler definition; got null");
+ }
+ if(StringUtils.isEmpty(entity)) {
+ throw new IllegalArgumentException(format("missing entity name; got '%s'", entity));
+ }
+
+ return new ProfileBuilder(definition, entity, clock, periodDurationMillis, zookeeperClient, global);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index b20dccf..bbd17a5 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -58,18 +58,33 @@ public class ProfileMeasurement {
*/
private ProfilePeriod period;
- /**
- * @param profileName The name of the profile.
- * @param entity The name of the entity being profiled.
- * @param whenMillis When the measurement was taken in epoch milliseconds.
- * @param periodDuration The duration of each profile period.
- * @param periodUnits The units of the duration of each profile period.
- */
- public ProfileMeasurement(String profileName, String entity, long whenMillis, long periodDuration, TimeUnit periodUnits) {
+ public ProfileMeasurement() {
+ this.groups = Collections.emptyList();
+ }
+
+ public ProfileMeasurement withProfileName(String profileName) {
this.profileName = profileName;
+ return this;
+ }
+
+ public ProfileMeasurement withEntity(String entity) {
this.entity = entity;
+ return this;
+ }
+
+ public ProfileMeasurement withValue(Object value) {
+ this.value = value;
+ return this;
+ }
+
+ public ProfileMeasurement withGroups(List<Object> groups) {
+ this.groups = groups;
+ return this;
+ }
+
+ public ProfileMeasurement withPeriod(long whenMillis, long periodDuration, TimeUnit periodUnits) {
this.period = new ProfilePeriod(whenMillis, periodDuration, periodUnits);
- this.groups = Collections.emptyList();
+ return this;
}
public String getProfileName() {
@@ -88,18 +103,10 @@ public class ProfileMeasurement {
return period;
}
- public void setValue(Object value) {
- this.value = value;
- }
-
public List<Object> getGroups() {
return groups;
}
- public void setGroups(List<Object> groups) {
- this.groups = groups;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index 1b8efc8..c466919 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -22,6 +22,8 @@ package org.apache.metron.profiler;
import java.util.concurrent.TimeUnit;
+import static java.lang.String.format;
+
/**
* The Profiler captures a ProfileMeasurement once every ProfilePeriod. There can be
* multiple ProfilePeriods every hour.
@@ -45,6 +47,11 @@ public class ProfilePeriod {
* @param units The units of the duration; hours, minutes, etc.
*/
public ProfilePeriod(long epochMillis, long duration, TimeUnit units) {
+ if(duration <= 0) {
+ throw new IllegalArgumentException(format(
+ "period duration must be greater than 0; got '%d %s'", duration, units));
+ }
+
this.durationMillis = units.toMillis(duration);
this.period = epochMillis / durationMillis;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
new file mode 100644
index 0000000..6730e49
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+/**
+ * A clock can tell time; imagine that.
+ *
+ * This allows the Profiler to support different treatments of time like wall clock versus event time.
+ */
+public interface Clock {
+
+ /**
+ * The current time in epoch milliseconds.
+ */
+ long currentTimeMillis();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
new file mode 100644
index 0000000..c6e93cd
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+import java.io.Serializable;
+
+/**
+ * A clock that reports whatever time you tell it to. Most useful for testing.
+ */
+public class FixedClock implements Clock, Serializable {
+
+ private long epochMillis;
+
+ public void setTime(long epochMillis) {
+ this.epochMillis = epochMillis;
+ }
+
+ @Override
+ public long currentTimeMillis() {
+ return this.epochMillis;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
new file mode 100644
index 0000000..1a20c94
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+import java.io.Serializable;
+
+/**
+ * A clock that uses the system clock to provide wall clock time.
+ */
+public class WallClock implements Clock, Serializable {
+
+ @Override
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
index 18997a6..27d23e2 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
@@ -20,6 +20,8 @@
package org.apache.metron.profiler.stellar;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.ClassUtils;
import org.apache.metron.common.dsl.Context;
import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.common.dsl.MapVariableResolver;
@@ -76,7 +78,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
*/
@Override
public Map<String, Object> getState() {
- return new HashMap<>(state);
+ return ImmutableMap.copyOf(state);
}
/**
@@ -95,6 +97,11 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
state.put(variable, result);
}
+ @Override
+ public void assign(String variable, Object value) {
+ state.put(variable, value);
+ }
+
/**
* Execute a Stellar expression and return the result. The internal state of the executor
* is not modified.
@@ -114,7 +121,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
T result = ConversionUtils.convert(resultObject, clazz);
if (result == null) {
throw new IllegalArgumentException(String.format("Unexpected type: expected=%s, actual=%s, expression=%s",
- clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expression));
+ clazz.getSimpleName(), ClassUtils.getShortClassName(resultObject,"null"), expression));
}
return result;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
index 869db42..2342eb7 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
@@ -30,6 +30,13 @@ import java.util.Map;
public interface StellarExecutor {
/**
+ * Assign a variable a specific value.
+ * @param variable The variable name.
+ * @param value The value to assign to the variable.
+ */
+ void assign(String variable, Object value);
+
+ /**
* Execute an expression and assign the result to a variable. The variable is maintained
* in the context of this executor and is available to all subsequent expressions.
*
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
new file mode 100644
index 0000000..c9873af
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
@@ -0,0 +1,383 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.FixedClock;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.common.utils.ConversionUtils.convert;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the ProfileBuilder class.
+ */
+public class ProfileBuilderTest {
+
+ /**
+ * {
+ * "ip_src_addr": "10.0.0.1",
+ * "ip_dst_addr": "10.0.0.20",
+ * "value": 100
+ * }
+ */
+ @Multiline
+ private String input;
+ private JSONObject message;
+ private ProfileBuilder builder;
+ private ProfileConfig definition;
+
+ @Before
+ public void setup() throws Exception {
+ message = (JSONObject) new JSONParser().parse(input);
+ }
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": {
+ * "x": "100",
+ * "y": "200"
+ * },
+ * "result": "x + y"
+ * }
+ */
+ @Multiline
+ private String testInitProfile;
+
+ /**
+ * Ensure that the 'init' block is executed correctly.
+ */
+ @Test
+ public void testInit() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate that x = 100, y = 200
+ assertEquals(100 + 200, (int) convert(m.getValue(), Integer.class));
+ }
+
+ /**
+ * The 'init' block is executed only when the first message is received. If no message
+ * has been received, the 'init' block will not be executed.
+ */
+ @Test
+ public void testInitWithNoMessage() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute
+ ProfileMeasurement m = builder.flush();
+
+ // validate that x = 0 and y = 0 as no initialization occurred
+ assertEquals(0, (int) convert(m.getValue(), Integer.class));
+ }
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": {
+ * "x": "0",
+ * "y": "0"
+ * },
+ * "update": {
+ * "x": "x + 1",
+ * "y": "y + 2"
+ * },
+ * "result": "x + y"
+ * }
+ */
+ @Multiline
+ private String testUpdateProfile;
+
+ /**
+ * Ensure that the 'update' expressions are executed for each message applied to the profile.
+ */
+ @Test
+ public void testUpdate() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute
+ int count = 10;
+ for(int i=0; i<count; i++) {
+ builder.apply(message);
+ }
+ ProfileMeasurement m = builder.flush();
+
+ // validate that x=0, y=0 then x+=1, y+=2 for each message
+ assertEquals(count*1 + count*2, (int) convert(m.getValue(), Integer.class));
+ }
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": { "x": "100" },
+ * "result": "x"
+ * }
+ */
+ @Multiline
+ private String testResultProfile;
+
+ /**
+ * Ensure that the result expression is executed on a flush.
+ */
+ @Test
+ public void testResult() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate
+ assertEquals(100, (int) convert(m.getValue(), Integer.class));
+ }
+
+ /**
+ * Ensure that time advances properly on each flush.
+ */
+ @Test
+ public void testProfilePeriodOnFlush() throws Exception {
+ // setup
+ FixedClock clock = new FixedClock();
+ clock.setTime(100);
+
+ definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .withClock(clock)
+ .build();
+
+ {
+ // apply a message and flush
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate the profile period
+ ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+ assertEquals(expected, m.getPeriod());
+ }
+ {
+ // advance time by at least one period - 10 minutes
+ clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
+
+ // apply a message and flush again
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate the profile period
+ ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+ assertEquals(expected, m.getPeriod());
+ }
+ }
+
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": { "x": "100" },
+ * "groupBy": ["x * 1", "x * 2"],
+ * "result": "100.0"
+ * }
+ */
+ @Multiline
+ private String testGroupByProfile;
+
+ /**
+ * Ensure that the 'groupBy' expression is executed correctly.
+ */
+ @Test
+ public void testGroupBy() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate
+ assertEquals(2, m.getGroups().size());
+ assertEquals(100, (int) convert(m.getGroups().get(0), Integer.class));
+ assertEquals(200, (int) convert(m.getGroups().get(1), Integer.class));
+ }
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "init": {
+ * "x": "0",
+ * "y": "0"
+ * },
+ * "update": {
+ * "x": "x + 1",
+ * "y": "y + 2"
+ * },
+ * "result": "x + y"
+ * }
+ */
+ @Multiline
+ private String testFlushProfile;
+
+ @Test
+ public void testFlushClearsState() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute - accumulate some state then flush it
+ int count = 10;
+ for(int i=0; i<count; i++) {
+ builder.apply(message);
+ }
+ builder.flush();
+
+ // apply another message to accumulate new state, then flush again to validate original state was cleared
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate
+ assertEquals(3, (int) convert(m.getValue(), Integer.class));
+ }
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "result": "100"
+ * }
+ */
+ @Multiline
+ private String testEntityProfile;
+
+ /**
+ * Ensure that the entity is correctly set on the resulting profile measurements.
+ */
+ @Test
+ public void testEntity() throws Exception {
+ // setup
+ final String entity = "10.0.0.1";
+ definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity(entity)
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // execute
+ builder.apply(message);
+ ProfileMeasurement m = builder.flush();
+
+ // validate
+ assertEquals(entity, m.getEntity());
+ }
+
+ /**
+ * {
+ * "profile": "test",
+ * "foreach": "ip_src_addr",
+ * "tickUpdate": {
+ * "ticks": "ticks + 1"
+ * },
+ * "result": "if exists(ticks) then ticks else 0"
+ * }
+ */
+ @Multiline
+ private String testTickUpdateProfile;
+
+ @Test
+ public void testTickUpdate() throws Exception {
+ // setup
+ definition = JSONUtils.INSTANCE.load(testTickUpdateProfile, ProfileConfig.class);
+ builder = new ProfileBuilder.Builder()
+ .withDefinition(definition)
+ .withEntity("10.0.0.1")
+ .withPeriodDuration(10, TimeUnit.MINUTES)
+ .build();
+
+ // 'tickUpdate' only executed when flushed - 'result' only has access to the 'old' tick value, not latest
+ {
+ ProfileMeasurement m = builder.flush();
+ assertEquals(0, (int) convert(m.getValue(), Integer.class));
+ }
+
+ // execute many flushes
+ int count = 10;
+ for(int i=0; i<count; i++) {
+ builder.flush();
+ }
+
+ {
+ // validate - the tickUpdate state should not be cleared between periods and is only run once per period
+ ProfileMeasurement m = builder.flush();
+ assertEquals(11, (int) convert(m.getValue(), Integer.class));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
index 0173511..3a51ea4 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -118,4 +118,11 @@ public class ProfilePeriodTest {
assertEquals(previous.getDurationMillis(), next.getDurationMillis());
});
}
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPeriodDurationOfZero() {
+ long duration = 0;
+ TimeUnit units = TimeUnit.HOURS;
+ new ProfilePeriod(0, duration, units);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
index c83f998..5d7d121 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
@@ -20,6 +20,7 @@
package org.apache.metron.profiler.hbase;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.storm.tuple.Tuple;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
@@ -64,8 +65,10 @@ public class SaltyRowKeyBuilderTest {
public void setup() throws Exception {
// a profile measurement
- measurement = new ProfileMeasurement("profile", "entity", AUG2016, periodDuration, periodUnits);
- measurement.setValue(22);
+ measurement = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(AUG2016, periodDuration, periodUnits);
// the tuple will contain the original message
tuple = mock(Tuple.class);
@@ -80,8 +83,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithOneGroup() throws Exception {
// setup
- List<Object> groups = Arrays.asList("group1");
- measurement.setGroups(groups);
+ measurement.withGroups(Arrays.asList("group1"));
// the expected row key
ByteBuffer buffer = ByteBuffer
@@ -107,8 +109,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithTwoGroups() throws Exception {
// setup
- List<Object> groups = Arrays.asList("group1","group2");
- measurement.setGroups(groups);
+ measurement.withGroups(Arrays.asList("group1","group2"));
// the expected row key
ByteBuffer buffer = ByteBuffer
@@ -135,8 +136,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithOneIntegerGroup() throws Exception {
// setup
- List<Object> groups = Arrays.asList(200);
- measurement.setGroups(groups);
+ measurement.withGroups(Arrays.asList(200));
// the expected row key
ByteBuffer buffer = ByteBuffer
@@ -162,8 +162,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithMixedGroups() throws Exception {
// setup
- List<Object> groups = Arrays.asList(200, "group1");
- measurement.setGroups(groups);
+ measurement.withGroups(Arrays.asList(200, "group1"));
// the expected row key
ByteBuffer buffer = ByteBuffer
@@ -190,8 +189,7 @@ public class SaltyRowKeyBuilderTest {
@Test
public void testRowKeyWithNoGroup() throws Exception {
// setup
- List<Object> groups = Collections.emptyList();
- measurement.setGroups(groups);
+ measurement.withGroups(Collections.emptyList());
// the expected row key
ByteBuffer buffer = ByteBuffer
@@ -224,8 +222,11 @@ public class SaltyRowKeyBuilderTest {
// a dummy profile measurement
long now = System.currentTimeMillis();
long oldest = now - TimeUnit.HOURS.toMillis(hoursAgo);
- ProfileMeasurement m = new ProfileMeasurement("profile", "entity", oldest, periodDuration, periodUnits);
- m.setValue(22);
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(oldest, periodDuration, periodUnits)
+ .withValue(22);
// generate a list of expected keys
List<byte[]> expectedKeys = new ArrayList<>();
@@ -237,7 +238,10 @@ public class SaltyRowKeyBuilderTest {
// advance to the next period
ProfilePeriod next = m.getPeriod().next();
- m = new ProfileMeasurement("profile", "entity", next.getStartTimeMillis(), periodDuration, periodUnits);
+ m = new ProfileMeasurement()
+ .withProfileName("profile")
+ .withEntity("entity")
+ .withPeriod(next.getStartTimeMillis(), periodDuration, periodUnits);
}
// execute
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
index d90c699..3110329 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
@@ -22,6 +22,11 @@ package org.apache.metron.profiler.stellar;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.functions.StringFunctions;
+import org.apache.metron.common.dsl.functions.resolver.ClasspathFunctionResolver;
+import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.common.dsl.functions.resolver.SimpleFunctionResolver;
+import org.apache.metron.common.field.validation.primitive.IntegerValidation;
import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -66,6 +71,9 @@ public class DefaultStellarExecutorTest {
// create the executor to test
executor = new DefaultStellarExecutor();
executor.setContext(Context.EMPTY_CONTEXT());
+
+ ClasspathFunctionResolver resolver = new ClasspathFunctionResolver();
+ executor.setFunctionResolver(resolver);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 3129910..1fcba30 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -22,37 +22,30 @@ package org.apache.metron.profiler.bolt;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import org.apache.commons.beanutils.BeanMap;
import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.dsl.Context;
-import org.apache.metron.common.dsl.ParseException;
-import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.profiler.ProfileBuilder;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
-import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.metron.profiler.clock.WallClock;
import org.apache.storm.Config;
-import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
-import static org.apache.commons.collections.CollectionUtils.isEmpty;
/**
* A bolt that is responsible for building a Profile.
@@ -84,7 +77,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
/**
* Maintains the state of a profile which is unique to a profile/entity pair.
*/
- private transient Cache<String, ProfileState> profileCache;
+ private transient Cache<String, ProfileBuilder> profileCache;
/**
* Parses JSON messages.
@@ -115,11 +108,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
super.prepare(stormConf, context, collector);
if(timeToLiveMillis < periodDurationMillis) {
- String msg = String.format("invalid configuration: expect profile TTL (%d) greater than period duration (%d)",
- timeToLiveMillis, periodDurationMillis);
- throw new IllegalStateException(msg);
+ throw new IllegalStateException(format(
+ "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
+ timeToLiveMillis,
+ periodDurationMillis));
}
-
this.collector = collector;
this.parser = new JSONParser();
this.profileCache = CacheBuilder
@@ -160,207 +153,23 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
*/
private void doExecute(Tuple input) throws ExecutionException {
- if(isTickTuple(input)) {
- flush(input);
-
- } else {
- if (!isInitialized(input)) {
- init(input);
- }
- update(input);
- }
- }
-
- /**
- * Initialize the bolt. Occurs when the first tuple is received at the start
- * of each window period.
- * @param input The input tuple
- */
- private void init(Tuple input) throws ExecutionException {
-
- ProfileState state = getProfileState(input);
- try {
-
- // the original telemetry message is provided as additional context for the 'update' expressions
- JSONObject message = getMessage(input);
-
- // execute the 'init' expression
- Map<String, String> expressions = state.getDefinition().getInit();
- expressions.forEach((var, expr) -> state.getExecutor().assign(var, expr, message));
-
- } catch(ParseException e) {
-
- // make it brilliantly clear that one of the 'init' expressions is bad
- ProfileMeasurement measurement = state.getMeasurement();
- String msg = format("Bad 'init' expression: %s, profile=%s, entity=%s",
- e.getMessage(), measurement.getProfileName(), measurement.getEntity());
- throw new ParseException(msg, e);
- }
- }
-
- /**
- * Update the Profile based on data contained in a new message.
- * @param input The tuple containing a new message.
- */
- private void update(Tuple input) throws ExecutionException {
-
- ProfileState state = getProfileState(input);
- try {
-
- // the original telemetry message is provided as additional context for the 'update' expressions
- JSONObject message = getMessage(input);
-
- // execute each of the 'update' expressions
- Map<String, String> expressions = state.getDefinition().getUpdate();
- expressions.forEach((var, expr) -> state.getExecutor().assign(var, expr, message));
+ if(TupleUtils.isTick(input)) {
- } catch(ParseException e) {
-
- // make it brilliantly clear that one of the 'update' expressions is bad
- ProfileMeasurement measurement = state.getMeasurement();
- String msg = format("Bad 'update' expression: %s, profile=%s, entity=%s",
- e.getMessage(), measurement.getProfileName(), measurement.getEntity());
- throw new ParseException(msg, e);
- }
- }
-
- /**
- * Flush the Profiles.
- *
- * Executed on a fixed time period when a tick tuple is received. Completes
- * and emits the ProfileMeasurement. Clears all state in preparation for
- * the next window period.
- */
- private void flush(Tuple tickTuple) {
-
- // flush each of the profiles maintain by this bolt
- profileCache.asMap().forEach((key, profileState) -> {
-
- ProfileMeasurement measurement = profileState.getMeasurement();
- StellarExecutor executor = profileState.getExecutor();
- ProfileConfig definition = profileState.getDefinition();
- LOG.info(String.format("Flushing profile: profile=%s, entity=%s", measurement.getProfileName(), measurement.getEntity()));
-
- // execute the 'result' and 'group by' expressions
- Object value = executeResult(definition.getResult(), executor, measurement);
- measurement.setValue(value);
-
- List<Object> groups = executeGroupBy(definition.getGroupBy(), executor, measurement);
- measurement.setGroups(groups);
-
- // emit the completed profile measurement
- emit(measurement, definition);
-
- // execute the update with the old state
- Map<String, String> tickUpdate = definition.getTickUpdate();
- Map<String, Object> state = executor.getState();
- if(tickUpdate != null) {
- tickUpdate.forEach((var, expr) -> executor.assign(var, expr, Collections.singletonMap("result", value)));
- }
-
- // clear the execution state to prepare for the next window
- executor.clearState();
-
- //make sure that we bring along the update state
- if(tickUpdate != null) {
- tickUpdate.forEach((var, expr) -> executor.getState().put(var, state.get(var)));
- }
+ // when a 'tick' is received, flush the profile and emit the completed profile measurement
+ profileCache.asMap().forEach((key, profileBuilder) -> {
+ ProfileMeasurement measurement = profileBuilder.flush();
+ collector.emit(new Values(measurement, profileBuilder.getDefinition()));
+ });
// cache maintenance
profileCache.cleanUp();
- });
- }
-
- /**
- * Create the state necessary to build a Profile.
- * @param tuple The tuple that needs applied to a profile.
- */
- private ProfileState createProfileState(Tuple tuple) {
-
- // extract the profile definition
- ProfileConfig profileDefinition = getProfileDefinition(tuple);
-
- // create the profile measurement which will be emitted at the end of the window period
- ProfileMeasurement measurement = new ProfileMeasurement(
- profileDefinition.getProfile(),
- getEntity(tuple),
- getTimestamp(),
- periodDurationMillis,
- TimeUnit.MILLISECONDS);
-
- // create the executor
- StellarExecutor executor = new DefaultStellarExecutor();
- Context context = new Context.Builder()
- .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
- .build();
- StellarFunctions.initialize(context);
- executor.setContext(context);
-
- // create the profile state which is maintained within a cache for a fixed period of time
- ProfileState state = new ProfileState();
- state.setExecutor(executor);
- state.setDefinition(profileDefinition);
- state.setMeasurement(measurement);
-
- return state;
- }
-
- /**
- * Executes the 'result' expression of a Profile.
- * @return The result of evaluating the 'result' expression.
- */
- private Object executeResult(String expression, StellarExecutor executor, ProfileMeasurement measurement) {
- Object result;
- try {
- result = executor.execute(expression, new JSONObject(), Object.class);
-
- } catch(ParseException e) {
- String msg = format("Bad 'result' expression: %s, profile=%s, entity=%s",
- e.getMessage(), measurement.getProfileName(), measurement.getEntity());
- throw new ParseException(msg, e);
- }
- return result;
- }
+ } else {
- /**
- * Executes each of the 'groupBy' expressions. The result of each
- * expression are the groups used to sort the data as part of the
- * row key.
- * @param expressions The 'groupBy' expressions to execute.
- * @return The result of executing the 'groupBy' expressions.
- */
- private List<Object> executeGroupBy(List<String> expressions, StellarExecutor executor, ProfileMeasurement measurement) {
- List<Object> groups = new ArrayList<>();
-
- if(!isEmpty(expressions)) {
- try {
- // allows each 'groupBy' expression to refer to the fields of the ProfileMeasurement
- BeanMap measureAsMap = new BeanMap(measurement);
-
- for (String expr : expressions) {
- Object result = executor.execute(expr, measureAsMap, Object.class);
- groups.add(result);
- }
-
- } catch(Throwable e) {
- String msg = format("Bad 'groupBy' expression: %s, profile=%s, entity=%s",
- e.getMessage(), measurement.getProfileName(), measurement.getEntity());
- throw new ParseException(msg, e);
- }
+ // telemetry message provides additional context for 'init' and 'update' expressions
+ JSONObject message = getField("message", input, JSONObject.class);
+ getBuilder(input).apply(message);
}
-
- return groups;
- }
-
- /**
- * Emits a message containing a ProfileMeasurement and the Profile configuration.
- * @param measurement The completed ProfileMeasurement.
- * @param definition The profile definition.
- */
- private void emit(ProfileMeasurement measurement, ProfileConfig definition) {
- collector.emit(new Values(measurement, definition));
}
/**
@@ -368,80 +177,44 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
* @param tuple A tuple.
*/
private String cacheKey(Tuple tuple) {
- return String.format("%s:%s", getProfileDefinition(tuple).getProfile(), getEntity(tuple));
+ return format("%s:%s",
+ getField("profile", tuple, ProfileConfig.class),
+ getField("entity", tuple, String.class));
}
/**
- * Retrieves the state associated with a Profile. If none exists, the state will
- * be initialized.
+ * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile. If none exists,
+ * one will be created and returned.
* @param tuple The tuple.
*/
- protected ProfileState getProfileState(Tuple tuple) throws ExecutionException {
- return profileCache.get(cacheKey(tuple), () -> createProfileState(tuple));
- }
-
- /**
- * Extracts the profile definition from a tuple.
- * @param tuple The tuple sent by the splitter bolt.
- */
- private ProfileConfig getProfileDefinition(Tuple tuple) {
- ProfileConfig definition = (ProfileConfig) tuple.getValueByField("profile");
- if(definition == null) {
- throw new IllegalStateException("invalid tuple received: missing profile definition");
- }
-
- return definition;
- }
-
- /**
- * Extracts the name of the entity from a tuple.
- * @param tuple The tuple sent by the splitter bolt.
- */
- private String getEntity(Tuple tuple) {
- String entity = tuple.getStringByField("entity");
- if(entity == null) {
- throw new IllegalStateException("invalid tuple received: missing entity name");
+ protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException {
+ return profileCache.get(
+ cacheKey(tuple),
+ () -> new ProfileBuilder.Builder()
+ .withDefinition(getField("profile", tuple, ProfileConfig.class))
+ .withEntity(getField("entity", tuple, String.class))
+ .withPeriodDurationMillis(periodDurationMillis)
+ .withGlobalConfiguration(getConfigurations().getGlobalConfig())
+ .withZookeeperClient(client)
+ .withClock(new WallClock())
+ .build());
+ }
+
+ /**
+ * Retrieves an expected field from a Tuple. If the field is missing an exception is thrown to
+ * indicate a fatal error.
+ * @param fieldName The name of the field.
+ * @param tuple The tuple from which to retrieve the field.
+ * @param clazz The type of the field value.
+ * @param <T> The type of the field value.
+ */
+ private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) {
+ T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz);
+ if(value == null) {
+ throw new IllegalStateException(format("invalid tuple received: missing field '%s'", fieldName));
}
- return entity;
- }
-
- /**
- * Extracts the original telemetry message from a tuple.
- * @param input The tuple sent by the splitter bolt.
- */
- private JSONObject getMessage(Tuple input) {
- JSONObject message = (JSONObject) input.getValueByField("message");
- if(message == null) {
- throw new IllegalStateException("invalid tuple received: missing message");
- }
-
- return message;
- }
-
- /**
- * Returns a value that can be used as the current timestamp. Allows subclasses
- * to override, if necessary.
- */
- private long getTimestamp() {
- return System.currentTimeMillis();
- }
-
- /**
- * Has the Stellar execution environment already been initialized
- * @return True, it it has been initialized.
- */
- private boolean isInitialized(Tuple tuple) {
- return profileCache.getIfPresent(cacheKey(tuple)) != null;
- }
-
- /**
- * Is this a tick tuple?
- * @param tuple The tuple
- */
- protected static boolean isTickTuple(Tuple tuple) {
- return Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) &&
- Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ return value;
}
public ProfileBuilderBolt withPeriodDurationMillis(long periodDurationMillis) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index fdea3e9..3e7b4d4 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -20,10 +20,7 @@
package org.apache.metron.profiler.bolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.commons.beanutils.BeanMap;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.dsl.ParseException;
import org.apache.metron.hbase.bolt.mapper.ColumnList;
import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
import org.apache.metron.profiler.ProfileMeasurement;
@@ -31,15 +28,10 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.storm.tuple.Tuple;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Optional;
-import static java.lang.String.format;
-import static org.apache.commons.collections.CollectionUtils.isEmpty;
-
/**
* An HbaseMapper that defines how a ProfileMeasurement is persisted within an HBase table.
*/
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java
deleted file mode 100644
index f6c48b4..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.stellar.StellarExecutor;
-
-/**
- * The state that must be maintained for each [profile, entity] pair when building a Profile.
- */
-public class ProfileState {
-
- /**
- * A ProfileMeasurement is created and emitted each window period. A Profile
- * itself is composed of many ProfileMeasurements.
- */
- private ProfileMeasurement measurement;
-
- /**
- * The definition of the Profile that the bolt is building.
- */
- private ProfileConfig definition;
-
- /**
- * Executes Stellar code and maintains state across multiple invocations.
- */
- private StellarExecutor executor;
-
- public ProfileMeasurement getMeasurement() {
- return measurement;
- }
-
- public void setMeasurement(ProfileMeasurement measurement) {
- this.measurement = measurement;
- }
-
- public ProfileConfig getDefinition() {
- return definition;
- }
-
- public void setDefinition(ProfileConfig definition) {
- this.definition = definition;
- }
-
- public StellarExecutor getExecutor() {
- return executor;
- }
-
- public void setExecutor(StellarExecutor executor) {
- this.executor = executor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ProfileState that = (ProfileState) o;
-
- if (measurement != null ? !measurement.equals(that.measurement) : that.measurement != null) return false;
- if (definition != null ? !definition.equals(that.definition) : that.definition != null) return false;
- return executor != null ? executor.equals(that.executor) : that.executor == null;
-
- }
-
- @Override
- public int hashCode() {
- int result = measurement != null ? measurement.hashCode() : 0;
- result = 31 * result + (definition != null ? definition.hashCode() : 0);
- result = 31 * result + (executor != null ? executor.hashCode() : 0);
- return result;
- }
-
- @Override
- public String toString() {
- return "ProfileState{" +
- "measurement=" + measurement +
- ", definition=" + definition +
- ", executor=" + executor +
- '}';
- }
-}