You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2016/12/05 19:26:16 UTC

[2/2] incubator-metron git commit: METRON-606 Profiler Overwriting Previously Written Values (nickwallen) closes apache/incubator-metron#387

METRON-606 Profiler Overwriting Previously Written Values (nickwallen) closes apache/incubator-metron#387


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/670f50ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/670f50ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/670f50ae

Branch: refs/heads/master
Commit: 670f50ae3b949502e660d9e15a10a1da6a0c6fd4
Parents: 1734f5f
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Dec 5 14:25:41 2016 -0500
Committer: Nick Allen <ni...@nickallen.org>
Committed: Mon Dec 5 14:25:41 2016 -0500

----------------------------------------------------------------------
 .../metron/profiler/client/GetProfileTest.java  |  21 +-
 .../client/HBaseProfilerClientTest.java         |  31 +-
 .../metron/profiler/client/ProfileWriter.java   |  26 +-
 .../apache/metron/profiler/ProfileBuilder.java  | 335 ++++++++++++++++
 .../metron/profiler/ProfileMeasurement.java     |  41 +-
 .../apache/metron/profiler/ProfilePeriod.java   |   7 +
 .../org/apache/metron/profiler/clock/Clock.java |  35 ++
 .../metron/profiler/clock/FixedClock.java       |  40 ++
 .../apache/metron/profiler/clock/WallClock.java |  34 ++
 .../stellar/DefaultStellarExecutor.java         |  11 +-
 .../profiler/stellar/StellarExecutor.java       |   7 +
 .../metron/profiler/ProfileBuilderTest.java     | 383 ++++++++++++++++++
 .../metron/profiler/ProfilePeriodTest.java      |   7 +
 .../profiler/hbase/SaltyRowKeyBuilderTest.java  |  34 +-
 .../stellar/DefaultStellarExecutorTest.java     |   8 +
 .../profiler/bolt/ProfileBuilderBolt.java       | 327 +++-------------
 .../profiler/bolt/ProfileHBaseMapper.java       |  10 +-
 .../metron/profiler/bolt/ProfileState.java      | 101 -----
 .../profiler/bolt/ProfileBuilderBoltTest.java   | 385 ++++++++-----------
 .../profiler/bolt/ProfileHBaseMapperTest.java   |   7 +-
 20 files changed, 1171 insertions(+), 679 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
index ce9965f..95582e7 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
@@ -129,7 +129,11 @@ public class GetProfileTest {
 
     // setup - write some measurements to be read later
     final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+
     profileWriter.write(m, count, group, val -> expectedValue);
 
     // execute - read the profile values - no groups
@@ -153,7 +157,10 @@ public class GetProfileTest {
 
     // setup - write some measurements to be read later
     final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, count, group, val -> expectedValue);
 
     // create a variable that contains the groups to use
@@ -180,7 +187,10 @@ public class GetProfileTest {
 
     // setup - write some measurements to be read later
     final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, count, group, val -> expectedValue);
 
     // create a variable that contains the groups to use
@@ -224,7 +234,10 @@ public class GetProfileTest {
     final List<Object> group = Collections.emptyList();
 
     // setup - write a single value from 2 hours ago
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, 1, group, val -> expectedValue);
 
     // create a variable that contains the groups to use

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 0076396..ed75a65 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -113,7 +113,11 @@ public class HBaseProfilerClientTest {
     final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
 
     // setup - write two groups of measurements - 'weekends' and 'weekdays'
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
+
     profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
     profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
@@ -137,7 +141,10 @@ public class HBaseProfilerClientTest {
     final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
 
     // create two groups of measurements - one on weekdays and one on weekends
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekdays"), val -> expectedValue);
     profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekends"), val -> 0);
 
@@ -160,7 +167,10 @@ public class HBaseProfilerClientTest {
     final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
 
     // setup - write some values to read later
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, hours * periodsPerHour, group, val -> 1000);
 
     // execute
@@ -183,7 +193,10 @@ public class HBaseProfilerClientTest {
     final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
 
     // setup - write two groups of measurements - 'weekends' and 'weekdays'
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
     profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
@@ -210,7 +223,10 @@ public class HBaseProfilerClientTest {
     final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
 
     // create two groups of measurements - one on weekdays and one on weekends
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(startTime, periodDuration, periodUnits);
     profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
     profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
 
@@ -235,7 +251,10 @@ public class HBaseProfilerClientTest {
     final long measurementTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
 
     // setup - write some values to read later
-    ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", measurementTime, periodDuration, periodUnits);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("entity1")
+            .withPeriod(measurementTime, periodDuration, periodUnits);
     profileWriter.write(m, numberToWrite, group, val -> 1000);
 
     // execute

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index a4ea8af..6e2b11e 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -70,21 +70,18 @@ public class ProfileWriter {
     ProfileMeasurement m = prototype;
     for(int i=0; i<count; i++) {
 
-      // create a measurement for the next profile period to be written
-      ProfilePeriod next = m.getPeriod().next();
-      m = new ProfileMeasurement(
-              prototype.getProfileName(),
-              prototype.getEntity(),
-              next.getStartTimeMillis(),
-              prototype.getPeriod().getDurationMillis(),
-              TimeUnit.MILLISECONDS);
-
       // generate the next value that should be written
       Object nextValue = valueGenerator.apply(m.getValue());
-      m.setValue(nextValue);
-      m.setGroups(group);
 
-      // write the measurement
+      // create a measurement for the next profile period to be written
+      ProfilePeriod next = m.getPeriod().next();
+      m = new ProfileMeasurement()
+              .withProfileName(prototype.getProfileName())
+              .withEntity(prototype.getEntity())
+              .withPeriod(next.getStartTimeMillis(), prototype.getPeriod().getDurationMillis(), TimeUnit.MILLISECONDS)
+              .withGroups(group)
+              .withValue(nextValue);
+
       write(m);
     }
   }
@@ -115,7 +112,10 @@ public class ProfileWriter {
     HTableInterface table = provider.getTable(config, "profiler");
 
     long when = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
-    ProfileMeasurement measure = new ProfileMeasurement("profile1", "192.168.66.121", when, 15, TimeUnit.MINUTES);
+    ProfileMeasurement measure = new ProfileMeasurement()
+            .withProfileName("profile1")
+            .withEntity("192.168.66.121")
+            .withPeriod(when, 15, TimeUnit.MINUTES);
 
     ProfileWriter writer = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
     writer.write(measure, 2 * 24 * 4, Collections.emptyList(), val -> new Random().nextInt(10));

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
new file mode 100644
index 0000000..2f1bc93
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -0,0 +1,335 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.collections4.ListUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.WallClock;
+import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
+import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static java.lang.String.format;
+
+/**
+ * Responsible for building and maintaining a Profile.
+ *
+ * One or more messages are applied to the Profile with `apply` and a profile measurement is
+ * produced by calling `flush`.
+ *
+ * Any one instance is responsible only for building the profile for a specific [profile, entity]
+ * pairing.  There will exist many instances, one for each [profile, entity] pair that exists
+ * within the incoming telemetry data applied to the profile.
+ */
+public class ProfileBuilder implements Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(ProfileBuilder.class);
+
+  /**
+   * The name of the profile.
+   */
+  private String profileName;
+
+  /**
+   * The name of the entity.
+   */
+  private String entity;
+
+  /**
+   * The definition of the Profile that the bolt is building.
+   */
+  private ProfileConfig definition;
+
+  /**
+   * Executes Stellar code and maintains state across multiple invocations.
+   */
+  private StellarExecutor executor;
+
+  /**
+   * Has the profile been initialized?
+   */
+  private boolean isInitialized;
+
+  /**
+   * The duration of each period in milliseconds.
+   */
+  private long periodDurationMillis;
+
+  /**
+   * A clock is used to tell time; imagine that.
+   */
+  private Clock clock;
+
+  /**
+   * Use the ProfileBuilder.Builder to create a new ProfileBuilder.
+   */
+  private ProfileBuilder(ProfileConfig definition,
+                         String entity,
+                         Clock clock,
+                         long periodDurationMillis,
+                         CuratorFramework client,
+                         Map<String, Object> global) {
+
+    this.isInitialized = false;
+    this.definition = definition;
+    this.profileName = definition.getProfile();
+    this.entity = entity;
+    this.clock = clock;
+    this.periodDurationMillis = periodDurationMillis;
+    this.executor = new DefaultStellarExecutor();
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .build();
+    StellarFunctions.initialize(context);
+    this.executor.setContext(context);
+  }
+
+  /**
+   * Apply a message to the profile.
+   * @param message The message to apply.
+   */
+  public void apply(JSONObject message) {
+
+    if(!isInitialized()) {
+      assign(definition.getInit(), message, "init");
+      isInitialized = true;
+    }
+
+    assign(definition.getUpdate(), message, "update");
+  }
+
+  /**
+   * Flush the Profile.
+   *
+   * Completes and emits the ProfileMeasurement.  Clears all state in preparation for
+   * the next window period.
+   * @return Returns the completed profile measurement.
+   */
+  public ProfileMeasurement flush() {
+    LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
+
+    // execute the 'result' expression
+    Object value = execute(definition.getResult(), new JSONObject(), "result");
+
+    // execute the 'groupBy' expression(s) - can refer to value of 'result' expression
+    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", value), "groupBy");
+
+    // execute the 'tickUpdate' expression(s) - can refer to value of 'result' expression
+    assign(definition.getTickUpdate(), ImmutableMap.of("result", value),"tickUpdate");
+
+    // save a copy of current state then clear it to prepare for the next window
+    Map<String, Object> state = executor.getState();
+    executor.clearState();
+
+    // the 'tickUpdate' state is not flushed - make sure to bring that state along to the next period
+    definition.getTickUpdate().forEach((var, expr) -> {
+      Object val = state.get(var);
+      executor.assign(var, val);
+    });
+
+    isInitialized = false;
+
+    return new ProfileMeasurement()
+            .withProfileName(profileName)
+            .withEntity(entity)
+            .withGroups(groups)
+            .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS)
+            .withValue(value);
+  }
+
+  /**
+   * Executes an expression contained within the profile definition.
+   * @param expression The expression to execute.
+   * @param transientState Additional transient state provided to the expression.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing the expression.
+   */
+  private Object execute(String expression, Map<String, Object> transientState, String expressionType) {
+    Object result = null;
+
+    List<Object> allResults = execute(Collections.singletonList(expression), transientState, expressionType);
+    if(allResults.size() > 0) {
+      result = allResults.get(0);
+    }
+
+    return result;
+  }
+
+  /**
+   * Executes a set of expressions whose results need to be assigned to a variable.
+   * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
+   * @param transientState Additional transient state provided to the expression.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   */
+  private void assign(Map<String, String> expressions, Map<String, Object> transientState, String expressionType) {
+    try {
+
+      // execute each of the 'update' expressions
+      MapUtils.emptyIfNull(expressions)
+              .forEach((var, expr) -> executor.assign(var, expr, transientState));
+
+    } catch(ParseException e) {
+
+      // make it brilliantly clear that one of the 'update' expressions is bad
+      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
+      throw new ParseException(msg, e);
+    }
+  }
+
+  /**
+   * Executes the expressions contained within the profile definition.
+   * @param expressions A list of expressions to execute.
+   * @param transientState Additional transient state provided to the expressions.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing each expression.
+   */
+  private List<Object> execute(List<String> expressions, Map<String, Object> transientState, String expressionType) {
+    List<Object> results = new ArrayList<>();
+
+    try {
+      ListUtils.emptyIfNull(expressions)
+              .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class)));
+
+    } catch (Throwable e) {
+      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity);
+      throw new ParseException(msg, e);
+    }
+
+    return results;
+  }
+
+  /**
+   * Returns the current value of a variable.
+   * @param variable The name of the variable.
+   */
+  public Object valueOf(String variable) {
+    return executor.getState().get(variable);
+  }
+
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  public ProfileConfig getDefinition() {
+    return definition;
+  }
+
+  /**
+   * A builder used to construct a new ProfileBuilder.
+   */
+  public static class Builder {
+
+    private ProfileConfig definition;
+    private String entity;
+    private long periodDurationMillis;
+    private CuratorFramework zookeeperClient;
+    private Map<String, Object> global;
+    private Clock clock = new WallClock();
+
+    public Builder withClock(Clock clock) {
+      this.clock = clock;
+      return this;
+    }
+
+    /**
+     * @param definition The profiler definition.
+     */
+    public Builder withDefinition(ProfileConfig definition) {
+      this.definition = definition;
+      return this;
+    }
+
+    /**
+     * @param entity The name of the entity
+     */
+    public Builder withEntity(String entity) {
+      this.entity = entity;
+      return this;
+    }
+
+    /**
+     * @param duration The duration of each profile period.
+     * @param units The units used to specify the duration of the profile period.
+     */
+    public Builder withPeriodDuration(long duration, TimeUnit units) {
+      this.periodDurationMillis = units.toMillis(duration);
+      return this;
+    }
+
+    /**
+     * @param millis The duration of each profile period in milliseconds.
+     */
+    public Builder withPeriodDurationMillis(long millis) {
+      this.periodDurationMillis = millis;
+      return this;
+    }
+
+    /**
+     * @param zookeeperClient The zookeeper client.
+     */
+    public Builder withZookeeperClient(CuratorFramework zookeeperClient) {
+      this.zookeeperClient = zookeeperClient;
+      return this;
+    }
+
+    /**
+     * @param global The global configuration.
+     */
+    public Builder withGlobalConfiguration(Map<String, Object> global) {
+      // TODO how does the profile builder ever seen a global that has been update in zookeeper?
+      this.global = global;
+      return this;
+    }
+
+    /**
+     * Construct a ProfileBuilder.
+     */
+    public ProfileBuilder build() {
+
+      if(definition == null) {
+        throw new IllegalArgumentException("missing profiler definition; got null");
+      }
+      if(StringUtils.isEmpty(entity)) {
+        throw new IllegalArgumentException(format("missing entity name; got '%s'", entity));
+      }
+
+      return new ProfileBuilder(definition, entity, clock, periodDurationMillis, zookeeperClient, global);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index b20dccf..bbd17a5 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -58,18 +58,33 @@ public class ProfileMeasurement {
    */
   private ProfilePeriod period;
 
-  /**
-   * @param profileName The name of the profile.
-   * @param entity The name of the entity being profiled.
-   * @param whenMillis When the measurement was taken in epoch milliseconds.
-   * @param periodDuration The duration of each profile period.
-   * @param periodUnits The units of the duration of each profile period.
-   */
-  public ProfileMeasurement(String profileName, String entity, long whenMillis, long periodDuration, TimeUnit periodUnits) {
+  public ProfileMeasurement() {
+    this.groups = Collections.emptyList();
+  }
+
+  public ProfileMeasurement withProfileName(String profileName) {
     this.profileName = profileName;
+    return this;
+  }
+
+  public ProfileMeasurement withEntity(String entity) {
     this.entity = entity;
+    return this;
+  }
+
+  public ProfileMeasurement withValue(Object value) {
+    this.value = value;
+    return this;
+  }
+
+  public ProfileMeasurement withGroups(List<Object> groups) {
+    this.groups = groups;
+    return this;
+  }
+
+  public ProfileMeasurement withPeriod(long whenMillis, long periodDuration, TimeUnit periodUnits) {
     this.period = new ProfilePeriod(whenMillis, periodDuration, periodUnits);
-    this.groups = Collections.emptyList();
+    return this;
   }
 
   public String getProfileName() {
@@ -88,18 +103,10 @@ public class ProfileMeasurement {
     return period;
   }
 
-  public void setValue(Object value) {
-    this.value = value;
-  }
-
   public List<Object> getGroups() {
     return groups;
   }
 
-  public void setGroups(List<Object> groups) {
-    this.groups = groups;
-  }
-
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index 1b8efc8..c466919 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -22,6 +22,8 @@ package org.apache.metron.profiler;
 
 import java.util.concurrent.TimeUnit;
 
+import static java.lang.String.format;
+
 /**
  * The Profiler captures a ProfileMeasurement once every ProfilePeriod.  There can be
  * multiple ProfilePeriods every hour.
@@ -45,6 +47,11 @@ public class ProfilePeriod {
    * @param units The units of the duration; hours, minutes, etc.
    */
   public ProfilePeriod(long epochMillis, long duration, TimeUnit units) {
+    if(duration <= 0) {
+      throw new IllegalArgumentException(format(
+              "period duration must be greater than 0; got '%d %s'", duration, units));
+    }
+
     this.durationMillis = units.toMillis(duration);
     this.period = epochMillis / durationMillis;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
new file mode 100644
index 0000000..6730e49
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/Clock.java
@@ -0,0 +1,35 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+/**
+ * A clock can tell time; imagine that.
+ *
+ * This allows the Profiler to support different treatments of time like wall clock versus event time.
+ */
+public interface Clock {
+
+  /**
+   * The current time in epoch milliseconds.
+   */
+  long currentTimeMillis();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
new file mode 100644
index 0000000..c6e93cd
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/FixedClock.java
@@ -0,0 +1,40 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+import java.io.Serializable;
+
+/**
+ * A clock that reports whatever time you tell it to.  Most useful for testing.
+ */
+public class FixedClock implements Clock, Serializable {
+
+  private long epochMillis;
+
+  public void setTime(long epochMillis) {
+    this.epochMillis = epochMillis;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return this.epochMillis;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
new file mode 100644
index 0000000..1a20c94
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/clock/WallClock.java
@@ -0,0 +1,34 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.clock;
+
+import java.io.Serializable;
+
+/**
+ * A clock that uses the system clock to provide wall clock time.
+ */
+public class WallClock implements Clock, Serializable {
+
+  @Override
+  public long currentTimeMillis() {
+    return System.currentTimeMillis();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
index 18997a6..27d23e2 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
@@ -20,6 +20,8 @@
 
 package org.apache.metron.profiler.stellar;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.ClassUtils;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
 import org.apache.metron.common.dsl.MapVariableResolver;
@@ -76,7 +78,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
    */
   @Override
   public Map<String, Object> getState() {
-    return new HashMap<>(state);
+    return ImmutableMap.copyOf(state);
   }
 
   /**
@@ -95,6 +97,11 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
     state.put(variable, result);
   }
 
+  @Override
+  public void assign(String variable, Object value) {
+    state.put(variable, value);
+  }
+
   /**
    * Execute a Stellar expression and return the result.  The internal state of the executor
    * is not modified.
@@ -114,7 +121,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
     T result = ConversionUtils.convert(resultObject, clazz);
     if (result == null) {
       throw new IllegalArgumentException(String.format("Unexpected type: expected=%s, actual=%s, expression=%s",
-              clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expression));
+              clazz.getSimpleName(), ClassUtils.getShortClassName(resultObject,"null"), expression));
     }
 
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
index 869db42..2342eb7 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
@@ -30,6 +30,13 @@ import java.util.Map;
 public interface StellarExecutor {
 
   /**
+   * Assign a variable a specific value.
+   * @param variable The variable name.
+   * @param value The value to assign to the variable.
+   */
+  void assign(String variable, Object value);
+
+  /**
    * Execute an expression and assign the result to a variable.  The variable is maintained
    * in the context of this executor and is available to all subsequent expressions.
    *

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
new file mode 100644
index 0000000..c9873af
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
@@ -0,0 +1,383 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.clock.Clock;
+import org.apache.metron.profiler.clock.FixedClock;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.common.utils.ConversionUtils.convert;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the ProfileBuilder class.
+ */
+public class ProfileBuilderTest {
+
+  /**
+   * {
+   *   "ip_src_addr": "10.0.0.1",
+   *   "ip_dst_addr": "10.0.0.20",
+   *   "value": 100
+   * }
+   */
+  @Multiline
+  private String input;
+  private JSONObject message;
+  private ProfileBuilder builder;
+  private ProfileConfig definition;
+
+  @Before
+  public void setup() throws Exception {
+    message = (JSONObject) new JSONParser().parse(input);
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "100",
+   *     "y": "200"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testInitProfile;
+
+  /**
+   * Ensure that the 'init' block is executed correctly.
+   */
+  @Test
+  public void testInit() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate that x = 100, y = 200
+    assertEquals(100 + 200, (int) convert(m.getValue(), Integer.class));
+  }
+
+  /**
+   * The 'init' block is executed only when the first message is received.  If no message
+   * has been received, the 'init' block will not be executed.
+   */
+  @Test
+  public void testInitWithNoMessage() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testInitProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    ProfileMeasurement m = builder.flush();
+
+    // validate that x = 0 and y = 0 as no initialization occurred
+    assertEquals(0, (int) convert(m.getValue(), Integer.class));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "0",
+   *     "y": "0"
+   *   },
+   *   "update": {
+   *     "x": "x + 1",
+   *     "y": "y + 2"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testUpdateProfile;
+
+  /**
+   * Ensure that the 'update' expressions are executed for each message applied to the profile.
+   */
+  @Test
+  public void testUpdate() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testUpdateProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    int count = 10;
+    for(int i=0; i<count; i++) {
+      builder.apply(message);
+    }
+    ProfileMeasurement m = builder.flush();
+
+    // validate that x=0, y=0 then x+=1, y+=2 for each message
+    assertEquals(count*1 + count*2, (int) convert(m.getValue(), Integer.class));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": { "x": "100" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String testResultProfile;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResult() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(100, (int) convert(m.getValue(), Integer.class));
+  }
+
+  /**
+   * Ensure that time advances properly on each flush.
+   */
+  @Test
+  public void testProfilePeriodOnFlush() throws Exception {
+    // setup
+    FixedClock clock = new FixedClock();
+    clock.setTime(100);
+
+    definition = JSONUtils.INSTANCE.load(testResultProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withClock(clock)
+            .build();
+
+    {
+      // apply a message and flush
+      builder.apply(message);
+      ProfileMeasurement m = builder.flush();
+
+      // validate the profile period
+      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+      assertEquals(expected, m.getPeriod());
+    }
+    {
+      // advance time by at least one period - 10 minutes
+      clock.setTime(clock.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10));
+
+      // apply a message and flush again
+      builder.apply(message);
+      ProfileMeasurement m = builder.flush();
+
+      // validate the profile period
+      ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES);
+      assertEquals(expected, m.getPeriod());
+    }
+  }
+
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": { "x": "100" },
+   *   "groupBy": ["x * 1", "x * 2"],
+   *   "result": "100.0"
+   * }
+   */
+  @Multiline
+  private String testGroupByProfile;
+
+  /**
+   * Ensure that the 'groupBy' expression is executed correctly.
+   */
+  @Test
+  public void testGroupBy() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testGroupByProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(2, m.getGroups().size());
+    assertEquals(100, (int) convert(m.getGroups().get(0), Integer.class));
+    assertEquals(200, (int) convert(m.getGroups().get(1), Integer.class));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *     "x": "0",
+   *     "y": "0"
+   *   },
+   *   "update": {
+   *     "x": "x + 1",
+   *     "y": "y + 2"
+   *   },
+   *   "result": "x + y"
+   * }
+   */
+  @Multiline
+  private String testFlushProfile;
+
+  @Test
+  public void testFlushClearsState() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute - accumulate some state then flush it
+    int count = 10;
+    for(int i=0; i<count; i++) {
+      builder.apply(message);
+    }
+    builder.flush();
+
+    // apply another message to accumulate new state, then flush again to validate original state was cleared
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(3, (int) convert(m.getValue(), Integer.class));
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "result": "100"
+   * }
+   */
+  @Multiline
+  private String testEntityProfile;
+
+  /**
+   * Ensure that the entity is correctly set on the resulting profile measurements.
+   */
+  @Test
+  public void testEntity() throws Exception {
+    // setup
+    final String entity = "10.0.0.1";
+    definition = JSONUtils.INSTANCE.load(testFlushProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity(entity)
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(entity, m.getEntity());
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "tickUpdate": {
+   *     "ticks": "ticks + 1"
+   *   },
+   *   "result": "if exists(ticks) then ticks else 0"
+   * }
+   */
+  @Multiline
+  private String testTickUpdateProfile;
+
+  @Test
+  public void testTickUpdate() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testTickUpdateProfile, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // 'tickUpdate' only executed when flushed - 'result' only has access to the 'old' tick value, not latest
+    {
+      ProfileMeasurement m = builder.flush();
+      assertEquals(0, (int) convert(m.getValue(), Integer.class));
+    }
+
+    // execute many flushes
+    int count = 10;
+    for(int i=0; i<count; i++) {
+      builder.flush();
+    }
+
+    {
+      // validate - the tickUpdate state should not be cleared between periods and is only run once per period
+      ProfileMeasurement m = builder.flush();
+      assertEquals(11, (int) convert(m.getValue(), Integer.class));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
index 0173511..3a51ea4 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -118,4 +118,11 @@ public class ProfilePeriodTest {
       assertEquals(previous.getDurationMillis(), next.getDurationMillis());
     });
   }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPeriodDurationOfZero() {
+    long duration = 0;
+    TimeUnit units = TimeUnit.HOURS;
+    new ProfilePeriod(0, duration, units);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
index c83f998..5d7d121 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler.hbase;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
@@ -64,8 +65,10 @@ public class SaltyRowKeyBuilderTest {
   public void setup() throws Exception {
 
     // a profile measurement
-    measurement = new ProfileMeasurement("profile", "entity", AUG2016, periodDuration, periodUnits);
-    measurement.setValue(22);
+    measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(AUG2016, periodDuration, periodUnits);
 
     // the tuple will contain the original message
     tuple = mock(Tuple.class);
@@ -80,8 +83,7 @@ public class SaltyRowKeyBuilderTest {
   @Test
   public void testRowKeyWithOneGroup() throws Exception {
     // setup
-    List<Object> groups = Arrays.asList("group1");
-    measurement.setGroups(groups);
+    measurement.withGroups(Arrays.asList("group1"));
 
     // the expected row key
     ByteBuffer buffer = ByteBuffer
@@ -107,8 +109,7 @@ public class SaltyRowKeyBuilderTest {
   @Test
   public void testRowKeyWithTwoGroups() throws Exception {
     // setup
-    List<Object> groups = Arrays.asList("group1","group2");
-    measurement.setGroups(groups);
+    measurement.withGroups(Arrays.asList("group1","group2"));
 
     // the expected row key
     ByteBuffer buffer = ByteBuffer
@@ -135,8 +136,7 @@ public class SaltyRowKeyBuilderTest {
   @Test
   public void testRowKeyWithOneIntegerGroup() throws Exception {
     // setup
-    List<Object> groups = Arrays.asList(200);
-    measurement.setGroups(groups);
+    measurement.withGroups(Arrays.asList(200));
 
     // the expected row key
     ByteBuffer buffer = ByteBuffer
@@ -162,8 +162,7 @@ public class SaltyRowKeyBuilderTest {
   @Test
   public void testRowKeyWithMixedGroups() throws Exception {
     // setup
-    List<Object> groups = Arrays.asList(200, "group1");
-    measurement.setGroups(groups);
+    measurement.withGroups(Arrays.asList(200, "group1"));
 
     // the expected row key
     ByteBuffer buffer = ByteBuffer
@@ -190,8 +189,7 @@ public class SaltyRowKeyBuilderTest {
   @Test
   public void testRowKeyWithNoGroup() throws Exception {
     // setup
-    List<Object> groups = Collections.emptyList();
-    measurement.setGroups(groups);
+    measurement.withGroups(Collections.emptyList());
 
     // the expected row key
     ByteBuffer buffer = ByteBuffer
@@ -224,8 +222,11 @@ public class SaltyRowKeyBuilderTest {
     // a dummy profile measurement
     long now = System.currentTimeMillis();
     long oldest = now - TimeUnit.HOURS.toMillis(hoursAgo);
-    ProfileMeasurement m = new ProfileMeasurement("profile", "entity", oldest, periodDuration, periodUnits);
-    m.setValue(22);
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(oldest, periodDuration, periodUnits)
+            .withValue(22);
 
     // generate a list of expected keys
     List<byte[]> expectedKeys = new ArrayList<>();
@@ -237,7 +238,10 @@ public class SaltyRowKeyBuilderTest {
 
       // advance to the next period
       ProfilePeriod next = m.getPeriod().next();
-      m = new ProfileMeasurement("profile", "entity", next.getStartTimeMillis(), periodDuration, periodUnits);
+      m = new ProfileMeasurement()
+              .withProfileName("profile")
+              .withEntity("entity")
+              .withPeriod(next.getStartTimeMillis(), periodDuration, periodUnits);
     }
 
     // execute

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
index d90c699..3110329 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
@@ -22,6 +22,11 @@ package org.apache.metron.profiler.stellar;
 
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.functions.StringFunctions;
+import org.apache.metron.common.dsl.functions.resolver.ClasspathFunctionResolver;
+import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.common.dsl.functions.resolver.SimpleFunctionResolver;
+import org.apache.metron.common.field.validation.primitive.IntegerValidation;
 import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -66,6 +71,9 @@ public class DefaultStellarExecutorTest {
     // create the executor to test
     executor = new DefaultStellarExecutor();
     executor.setContext(Context.EMPTY_CONTEXT());
+
+    ClasspathFunctionResolver resolver = new ClasspathFunctionResolver();
+    executor.setFunctionResolver(resolver);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 3129910..1fcba30 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -22,37 +22,30 @@ package org.apache.metron.profiler.bolt;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
-import org.apache.commons.beanutils.BeanMap;
 import org.apache.metron.common.bolt.ConfiguredProfilerBolt;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.dsl.Context;
-import org.apache.metron.common.dsl.ParseException;
-import org.apache.metron.common.dsl.StellarFunctions;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.profiler.ProfileBuilder;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
-import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.metron.profiler.clock.WallClock;
 import org.apache.storm.Config;
-import org.apache.storm.Constants;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.TupleUtils;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static java.lang.String.format;
-import static org.apache.commons.collections.CollectionUtils.isEmpty;
 
 /**
  * A bolt that is responsible for building a Profile.
@@ -84,7 +77,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   /**
    * Maintains the state of a profile which is unique to a profile/entity pair.
    */
-  private transient Cache<String, ProfileState> profileCache;
+  private transient Cache<String, ProfileBuilder> profileCache;
 
   /**
    * Parses JSON messages.
@@ -115,11 +108,11 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     super.prepare(stormConf, context, collector);
 
     if(timeToLiveMillis < periodDurationMillis) {
-      String msg = String.format("invalid configuration: expect profile TTL (%d) greater than period duration (%d)",
-              timeToLiveMillis, periodDurationMillis);
-      throw new IllegalStateException(msg);
+      throw new IllegalStateException(format(
+              "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
+              timeToLiveMillis,
+              periodDurationMillis));
     }
-
     this.collector = collector;
     this.parser = new JSONParser();
     this.profileCache = CacheBuilder
@@ -160,207 +153,23 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    */
   private void doExecute(Tuple input) throws ExecutionException {
 
-    if(isTickTuple(input)) {
-      flush(input);
-
-    } else {
-      if (!isInitialized(input)) {
-        init(input);
-      }
-      update(input);
-    }
-  }
-
-  /**
-   * Initialize the bolt.  Occurs when the first tuple is received at the start
-   * of each window period.
-   * @param input The input tuple
-   */
-  private void init(Tuple input) throws ExecutionException {
-
-    ProfileState state = getProfileState(input);
-    try {
-
-      // the original telemetry message is provided as additional context for the 'update' expressions
-      JSONObject message = getMessage(input);
-
-      // execute the 'init' expression
-      Map<String, String> expressions = state.getDefinition().getInit();
-      expressions.forEach((var, expr) -> state.getExecutor().assign(var, expr, message));
-
-    } catch(ParseException e) {
-
-      // make it brilliantly clear that one of the 'init' expressions is bad
-      ProfileMeasurement measurement = state.getMeasurement();
-      String msg = format("Bad 'init' expression: %s, profile=%s, entity=%s",
-              e.getMessage(), measurement.getProfileName(), measurement.getEntity());
-      throw new ParseException(msg, e);
-    }
-  }
-
-  /**
-   * Update the Profile based on data contained in a new message.
-   * @param input The tuple containing a new message.
-   */
-  private void update(Tuple input) throws ExecutionException {
-
-    ProfileState state = getProfileState(input);
-    try {
-
-      // the original telemetry message is provided as additional context for the 'update' expressions
-      JSONObject message = getMessage(input);
-
-      // execute each of the 'update' expressions
-      Map<String, String> expressions = state.getDefinition().getUpdate();
-      expressions.forEach((var, expr) -> state.getExecutor().assign(var, expr, message));
+    if(TupleUtils.isTick(input)) {
 
-    } catch(ParseException e) {
-
-      // make it brilliantly clear that one of the 'update' expressions is bad
-      ProfileMeasurement measurement = state.getMeasurement();
-      String msg = format("Bad 'update' expression: %s, profile=%s, entity=%s",
-              e.getMessage(), measurement.getProfileName(), measurement.getEntity());
-      throw new ParseException(msg, e);
-    }
-  }
-
-  /**
-   * Flush the Profiles.
-   *
-   * Executed on a fixed time period when a tick tuple is received.  Completes
-   * and emits the ProfileMeasurement.  Clears all state in preparation for
-   * the next window period.
-   */
-  private void flush(Tuple tickTuple) {
-
-    // flush each of the profiles maintain by this bolt
-    profileCache.asMap().forEach((key, profileState) -> {
-
-      ProfileMeasurement measurement = profileState.getMeasurement();
-      StellarExecutor executor = profileState.getExecutor();
-      ProfileConfig definition = profileState.getDefinition();
-      LOG.info(String.format("Flushing profile: profile=%s, entity=%s", measurement.getProfileName(), measurement.getEntity()));
-
-      // execute the 'result' and 'group by' expressions
-      Object value = executeResult(definition.getResult(), executor, measurement);
-      measurement.setValue(value);
-
-      List<Object> groups = executeGroupBy(definition.getGroupBy(), executor, measurement);
-      measurement.setGroups(groups);
-
-      // emit the completed profile measurement
-      emit(measurement, definition);
-
-      // execute the update with the old state
-      Map<String, String> tickUpdate = definition.getTickUpdate();
-      Map<String, Object> state = executor.getState();
-      if(tickUpdate != null) {
-        tickUpdate.forEach((var, expr) -> executor.assign(var, expr, Collections.singletonMap("result", value)));
-      }
-
-      // clear the execution state to prepare for the next window
-      executor.clearState();
-
-      //make sure that we bring along the update state
-      if(tickUpdate != null) {
-        tickUpdate.forEach((var, expr) -> executor.getState().put(var, state.get(var)));
-      }
+      // when a 'tick' is received, flush the profile and emit the completed profile measurement
+      profileCache.asMap().forEach((key, profileBuilder) -> {
+        ProfileMeasurement measurement = profileBuilder.flush();
+        collector.emit(new Values(measurement, profileBuilder.getDefinition()));
+      });
 
       // cache maintenance
       profileCache.cleanUp();
-    });
-  }
-
-  /**
-   * Create the state necessary to build a Profile.
-   * @param tuple The tuple that needs applied to a profile.
-   */
-  private ProfileState createProfileState(Tuple tuple) {
-
-    // extract the profile definition
-    ProfileConfig profileDefinition = getProfileDefinition(tuple);
-
-    // create the profile measurement which will be emitted at the end of the window period
-    ProfileMeasurement measurement = new ProfileMeasurement(
-            profileDefinition.getProfile(),
-            getEntity(tuple),
-            getTimestamp(),
-            periodDurationMillis,
-            TimeUnit.MILLISECONDS);
-
-    // create the executor
-    StellarExecutor executor = new DefaultStellarExecutor();
-    Context context = new Context.Builder()
-            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-            .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
-            .build();
-    StellarFunctions.initialize(context);
-    executor.setContext(context);
-
-    // create the profile state which is maintained within a cache for a fixed period of time
-    ProfileState state = new ProfileState();
-    state.setExecutor(executor);
-    state.setDefinition(profileDefinition);
-    state.setMeasurement(measurement);
-
-    return state;
-  }
-
-  /**
-   * Executes the 'result' expression of a Profile.
-   * @return The result of evaluating the 'result' expression.
-   */
-  private Object executeResult(String expression, StellarExecutor executor, ProfileMeasurement measurement) {
-    Object result;
-    try {
-      result = executor.execute(expression, new JSONObject(), Object.class);
-
-    } catch(ParseException e) {
-      String msg = format("Bad 'result' expression: %s, profile=%s, entity=%s",
-              e.getMessage(), measurement.getProfileName(), measurement.getEntity());
-      throw new ParseException(msg, e);
-    }
-    return result;
-  }
 
+    } else {
 
-  /**
-   * Executes each of the 'groupBy' expressions.  The result of each
-   * expression are the groups used to sort the data as part of the
-   * row key.
-   * @param expressions The 'groupBy' expressions to execute.
-   * @return The result of executing the 'groupBy' expressions.
-   */
-  private List<Object> executeGroupBy(List<String> expressions, StellarExecutor executor, ProfileMeasurement measurement) {
-    List<Object> groups = new ArrayList<>();
-
-    if(!isEmpty(expressions)) {
-      try {
-        // allows each 'groupBy' expression to refer to the fields of the ProfileMeasurement
-        BeanMap measureAsMap = new BeanMap(measurement);
-
-        for (String expr : expressions) {
-          Object result = executor.execute(expr, measureAsMap, Object.class);
-          groups.add(result);
-        }
-
-      } catch(Throwable e) {
-        String msg = format("Bad 'groupBy' expression: %s, profile=%s, entity=%s",
-                e.getMessage(), measurement.getProfileName(), measurement.getEntity());
-        throw new ParseException(msg, e);
-      }
+      // telemetry message provides additional context for 'init' and 'update' expressions
+      JSONObject message = getField("message", input, JSONObject.class);
+      getBuilder(input).apply(message);
     }
-
-    return groups;
-  }
-
-  /**
-   * Emits a message containing a ProfileMeasurement and the Profile configuration.
-   * @param measurement The completed ProfileMeasurement.
-   * @param definition The profile definition.
-   */
-  private void emit(ProfileMeasurement measurement, ProfileConfig definition) {
-    collector.emit(new Values(measurement, definition));
   }
 
   /**
@@ -368,80 +177,44 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    * @param tuple A tuple.
    */
   private String cacheKey(Tuple tuple) {
-    return String.format("%s:%s", getProfileDefinition(tuple).getProfile(), getEntity(tuple));
+    return format("%s:%s",
+            getField("profile", tuple, ProfileConfig.class),
+            getField("entity", tuple, String.class));
   }
 
   /**
-   * Retrieves the state associated with a Profile.  If none exists, the state will
-   * be initialized.
+   * Retrieves the cached ProfileBuilder that is used to build and maintain the Profile.  If none exists,
+   * one will be created and returned.
    * @param tuple The tuple.
    */
-  protected ProfileState getProfileState(Tuple tuple) throws ExecutionException {
-    return profileCache.get(cacheKey(tuple), () -> createProfileState(tuple));
-  }
-
-  /**
-   * Extracts the profile definition from a tuple.
-   * @param tuple The tuple sent by the splitter bolt.
-   */
-  private ProfileConfig getProfileDefinition(Tuple tuple) {
-    ProfileConfig definition = (ProfileConfig) tuple.getValueByField("profile");
-    if(definition == null) {
-      throw new IllegalStateException("invalid tuple received: missing profile definition");
-    }
-
-    return definition;
-  }
-
-  /**
-   * Extracts the name of the entity from a tuple.
-   * @param tuple The tuple sent by the splitter bolt.
-   */
-  private String getEntity(Tuple tuple) {
-    String entity = tuple.getStringByField("entity");
-    if(entity == null) {
-      throw new IllegalStateException("invalid tuple received: missing entity name");
+  protected ProfileBuilder getBuilder(Tuple tuple) throws ExecutionException {
+    return profileCache.get(
+            cacheKey(tuple),
+            () -> new ProfileBuilder.Builder()
+                    .withDefinition(getField("profile", tuple, ProfileConfig.class))
+                    .withEntity(getField("entity", tuple, String.class))
+                    .withPeriodDurationMillis(periodDurationMillis)
+                    .withGlobalConfiguration(getConfigurations().getGlobalConfig())
+                    .withZookeeperClient(client)
+                    .withClock(new WallClock())
+                    .build());
+  }
+
+  /**
+   * Retrieves an expected field from a Tuple.  If the field is missing an exception is thrown to
+   * indicate a fatal error.
+   * @param fieldName The name of the field.
+   * @param tuple The tuple from which to retrieve the field.
+   * @param clazz The type of the field value.
+   * @param <T> The type of the field value.
+   */
+  private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) {
+    T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz);
+    if(value == null) {
+      throw new IllegalStateException(format("invalid tuple received: missing field '%s'", fieldName));
     }
 
-    return entity;
-  }
-
-  /**
-   * Extracts the original telemetry message from a tuple.
-   * @param input The tuple sent by the splitter bolt.
-   */
-  private JSONObject getMessage(Tuple input) {
-    JSONObject message = (JSONObject) input.getValueByField("message");
-    if(message == null) {
-      throw new IllegalStateException("invalid tuple received: missing message");
-    }
-
-    return message;
-  }
-
-  /**
-   * Returns a value that can be used as the current timestamp.  Allows subclasses
-   * to override, if necessary.
-   */
-  private long getTimestamp() {
-    return System.currentTimeMillis();
-  }
-
-  /**
-   * Has the Stellar execution environment already been initialized
-   * @return True, it it has been initialized.
-   */
-  private boolean isInitialized(Tuple tuple) {
-    return profileCache.getIfPresent(cacheKey(tuple)) != null;
-  }
-
-  /**
-   * Is this a tick tuple?
-   * @param tuple The tuple
-   */
-  protected static boolean isTickTuple(Tuple tuple) {
-    return Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) &&
-            Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+    return value;
   }
 
   public ProfileBuilderBolt withPeriodDurationMillis(long periodDurationMillis) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index fdea3e9..3e7b4d4 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -20,10 +20,7 @@
 
 package org.apache.metron.profiler.bolt;
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.commons.beanutils.BeanMap;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.hbase.bolt.mapper.ColumnList;
 import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
 import org.apache.metron.profiler.ProfileMeasurement;
@@ -31,15 +28,10 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.storm.tuple.Tuple;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Optional;
 
-import static java.lang.String.format;
-import static org.apache.commons.collections.CollectionUtils.isEmpty;
-
 /**
  * An HbaseMapper that defines how a ProfileMeasurement is persisted within an HBase table.
  */

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/670f50ae/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java
deleted file mode 100644
index f6c48b4..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileState.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.stellar.StellarExecutor;
-
-/**
- * The state that must be maintained for each [profile, entity] pair when building a Profile.
- */
-public class ProfileState {
-
-  /**
-   * A ProfileMeasurement is created and emitted each window period.  A Profile
-   * itself is composed of many ProfileMeasurements.
-   */
-  private ProfileMeasurement measurement;
-
-  /**
-   * The definition of the Profile that the bolt is building.
-   */
-  private ProfileConfig definition;
-
-  /**
-   * Executes Stellar code and maintains state across multiple invocations.
-   */
-  private StellarExecutor executor;
-
-  public ProfileMeasurement getMeasurement() {
-    return measurement;
-  }
-
-  public void setMeasurement(ProfileMeasurement measurement) {
-    this.measurement = measurement;
-  }
-
-  public ProfileConfig getDefinition() {
-    return definition;
-  }
-
-  public void setDefinition(ProfileConfig definition) {
-    this.definition = definition;
-  }
-
-  public StellarExecutor getExecutor() {
-    return executor;
-  }
-
-  public void setExecutor(StellarExecutor executor) {
-    this.executor = executor;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    ProfileState that = (ProfileState) o;
-
-    if (measurement != null ? !measurement.equals(that.measurement) : that.measurement != null) return false;
-    if (definition != null ? !definition.equals(that.definition) : that.definition != null) return false;
-    return executor != null ? executor.equals(that.executor) : that.executor == null;
-
-  }
-
-  @Override
-  public int hashCode() {
-    int result = measurement != null ? measurement.hashCode() : 0;
-    result = 31 * result + (definition != null ? definition.hashCode() : 0);
-    result = 31 * result + (executor != null ? executor.hashCode() : 0);
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return "ProfileState{" +
-            "measurement=" + measurement +
-            ", definition=" + definition +
-            ", executor=" + executor +
-            '}';
-  }
-}