You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:49 UTC

[19/52] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
new file mode 100644
index 0000000..b8949c5
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/FixedFrequencyFlushSignalTest.java
@@ -0,0 +1,71 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@code FixedFrequencyFlushSignal} class.
+ */
+public class FixedFrequencyFlushSignalTest {
+
+  @Test
+  public void testSignalFlush() {
+
+    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+
+    // not time to flush yet
+    assertFalse(signal.isTimeToFlush());
+
+    // advance time
+    signal.update(5000);
+
+    // not time to flush yet
+    assertFalse(signal.isTimeToFlush());
+
+    // advance time
+    signal.update(7000);
+
+    // time to flush
+    assertTrue(signal.isTimeToFlush());
+  }
+
+  @Test
+  public void testOutOfOrderTimestamps() {
+    FixedFrequencyFlushSignal signal = new FixedFrequencyFlushSignal(1000);
+
+    // advance time, out-of-order
+    signal.update(5000);
+    signal.update(1000);
+    signal.update(7000);
+    signal.update(3000);
+
+    // need to flush @ 5000 + 1000 = 6000. if anything > 6000 (even out-of-order), then it should signal a flush
+    assertTrue(signal.isTimeToFlush());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeFrequency() {
+    new FixedFrequencyFlushSignal(-1000);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
deleted file mode 100644
index c3f2584..0000000
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.bolt;
-
-import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.statistics.OnlineStatisticsProvider;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Values;
-import org.json.simple.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-/**
- * Tests the KafkaDestinationHandler.
- */
-public class KafkaDestinationHandlerTest {
-
-  /**
-   * {
-   *   "profile": "profile-one-destination",
-   *   "foreach": "ip_src_addr",
-   *   "init":   { "x": "0" },
-   *   "update": { "x": "x + 1" },
-   *   "result": "x"
-   * }
-   */
-  @Multiline
-  private String profileDefinition;
-
-  private KafkaDestinationHandler handler;
-  private ProfileConfig profile;
-  private OutputCollector collector;
-
-  @Before
-  public void setup() throws Exception {
-    handler = new KafkaDestinationHandler();
-    profile = createDefinition(profileDefinition);
-    collector = Mockito.mock(OutputCollector.class);
-  }
-
-  /**
-   * The handler must serialize the ProfileMeasurement into a JSONObject.
-   */
-  @Test
-  public void testSerialization() throws Exception {
-
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
-
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-
-    // expect a JSONObject
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-
-    // validate the json
-    JSONObject actual = (JSONObject) values.get(0);
-    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
-    assertEquals(measurement.getEntity(), actual.get("entity"));
-    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
-    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
-    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
-    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
-    assertNotNull(actual.get("timestamp"));
-    assertEquals("profiler", actual.get("source.type"));
-  }
-
-  /**
-   * Values destined for Kafka can only be serialized into text, which limits the types of values
-   * that can result from a triage expression.  Only primitive types and Strings are allowed.
-   */
-  @Test
-  public void testInvalidType() throws Exception {
-
-    // create one invalid expression and one valid expression
-    Map<String, Object> triageValues = ImmutableMap.of(
-            "invalid", new OnlineStatisticsProvider(),
-            "valid", 4);
-
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(triageValues)
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
-
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-
-    // only the triage expression value itself should have been skipped, all others should be there
-    JSONObject actual = (JSONObject) values.get(0);
-    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
-    assertEquals(measurement.getEntity(), actual.get("entity"));
-    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
-    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
-    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
-    assertNotNull(actual.get("timestamp"));
-    assertEquals("profiler", actual.get("source.type"));
-
-    // the invalid expression should be skipped due to invalid type
-    assertFalse(actual.containsKey("invalid"));
-
-    // but the valid expression should still be there
-    assertEquals(triageValues.get("valid"), actual.get("valid"));
-  }
-
-  /**
-   * Values destined for Kafka can only be serialized into text, which limits the types of values
-   * that can result from a triage expression.  Only primitive types and Strings are allowed.
-   */
-  @Test
-  public void testIntegerIsValidType() throws Exception {
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", 123))
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
-
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-    JSONObject actual = (JSONObject) values.get(0);
-
-    // the triage expression is valid
-    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
-  }
-
-  /**
-   * Values destined for Kafka can only be serialized into text, which limits the types of values
-   * that can result from a triage expression.  Only primitive types and Strings are allowed.
-   */
-  @Test
-  public void testStringIsValidType() throws Exception {
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName("profile")
-            .withEntity("entity")
-            .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withTriageValues(Collections.singletonMap("triage-key", "value"))
-            .withDefinition(profile);
-    handler.emit(measurement, collector);
-
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
-    Values values = arg.getValue();
-    assertTrue(values.get(0) instanceof JSONObject);
-    JSONObject actual = (JSONObject) values.get(0);
-
-    // the triage expression is valid
-    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
-  }
-
-  /**
-   * Creates a profile definition based on a string of JSON.
-   * @param json The string of JSON.
-   */
-  private ProfileConfig createDefinition(String json) throws IOException {
-    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
new file mode 100644
index 0000000..b02e377
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaEmitterTest.java
@@ -0,0 +1,208 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests the KafkaDestinationHandler.
+ */
+public class KafkaEmitterTest {
+
+  /**
+   * {
+   *   "profile": "profile-one-destination",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileDefinition;
+
+  private KafkaEmitter handler;
+  private ProfileConfig profile;
+  private OutputCollector collector;
+
+  @Before
+  public void setup() throws Exception {
+    handler = new KafkaEmitter();
+    profile = createDefinition(profileDefinition);
+    collector = Mockito.mock(OutputCollector.class);
+  }
+
+  /**
+   * The handler must serialize the ProfileMeasurement into a JSONObject.
+   */
+  @Test
+  public void testSerialization() throws Exception {
+
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+
+    // expect a JSONObject
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+
+    // validate the json
+    JSONObject actual = (JSONObject) values.get(0);
+    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+    assertEquals(measurement.getEntity(), actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+    assertNotNull(actual.get("timestamp"));
+    assertEquals("profiler", actual.get("source.type"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testInvalidType() throws Exception {
+
+    // create one invalid expression and one valid expression
+    Map<String, Object> triageValues = ImmutableMap.of(
+            "invalid", new OnlineStatisticsProvider(),
+            "valid", 4);
+
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(triageValues)
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+
+    // only the triage expression value itself should have been skipped, all others should be there
+    JSONObject actual = (JSONObject) values.get(0);
+    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+    assertEquals(measurement.getEntity(), actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+    assertNotNull(actual.get("timestamp"));
+    assertEquals("profiler", actual.get("source.type"));
+
+    // the invalid expression should be skipped due to invalid type
+    assertFalse(actual.containsKey("invalid"));
+
+    // but the valid expression should still be there
+    assertEquals(triageValues.get("valid"), actual.get("valid"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testIntegerIsValidType() throws Exception {
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", 123))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    JSONObject actual = (JSONObject) values.get(0);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testStringIsValidType() throws Exception {
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "value"))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    JSONObject actual = (JSONObject) values.get(0);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 21d61ab..78e20e0 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -20,35 +20,37 @@
 
 package org.apache.metron.profiler.bolt;
 
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
+import org.apache.metron.profiler.MessageDistributor;
 import org.apache.metron.profiler.MessageRoute;
-import org.apache.metron.profiler.ProfileBuilder;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.profiler.integration.MessageBuilder;
 import org.apache.metron.test.bolt.BaseBoltTest;
-import org.apache.storm.Constants;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseWindowedBolt;
+import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
+import org.apache.storm.windowing.TupleWindow;
 import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -59,284 +61,348 @@ import static org.mockito.Mockito.when;
  */
 public class ProfileBuilderBoltTest extends BaseBoltTest {
 
-  /**
-   * {
-   *   "ip_src_addr": "10.0.0.1",
-   *   "value": "22"
-   * }
-   */
-  @Multiline
-  private String inputOne;
-  private JSONObject messageOne;
+  private JSONObject message1;
+  private JSONObject message2;
+  private ProfileConfig profile1;
+  private ProfileConfig profile2;
+  private ProfileMeasurementEmitter emitter;
+  private ManualFlushSignal flushSignal;
 
-  /**
-   * {
-   *   "ip_src_addr": "10.0.0.2",
-   *   "value": "22"
-   * }
-   */
-  @Multiline
-  private String inputTwo;
-  private JSONObject messageTwo;
+  @Before
+  public void setup() throws Exception {
+
+    message1 = new MessageBuilder()
+            .withField("ip_src_addr", "10.0.0.1")
+            .withField("value", "22")
+            .build();
+
+    message2 = new MessageBuilder()
+            .withField("ip_src_addr", "10.0.0.2")
+            .withField("value", "22")
+            .build();
+
+    profile1 = new ProfileConfig()
+            .withProfile("profile1")
+            .withForeach("ip_src_addr")
+            .withInit("x", "0")
+            .withUpdate("x", "x + 1")
+            .withResult("x");
+
+    profile2 = new ProfileConfig()
+            .withProfile("profile2")
+            .withForeach("ip_src_addr")
+            .withInit(Collections.singletonMap("x", "0"))
+            .withUpdate(Collections.singletonMap("x", "x + 1"))
+            .withResult("x");
+
+    flushSignal = new ManualFlushSignal();
+    flushSignal.setFlushNow(false);
+  }
 
   /**
-   * {
-   *   "profile": "profileOne",
-   *   "foreach": "ip_src_addr",
-   *   "init":   { "x": "0" },
-   *   "update": { "x": "x + 1" },
-   *   "result": "x"
-   * }
+   * The bolt should extract a message and timestamp from a tuple and
+   * pass that to a {@code MessageDistributor}.
    */
-  @Multiline
-  private String profileOne;
+  @Test
+  public void testExtractMessage() throws Exception {
 
+    ProfileBuilderBolt bolt = createBolt();
 
-  /**
-   * {
-   *   "profile": "profileTwo",
-   *   "foreach": "ip_src_addr",
-   *   "init":   { "x": "0" },
-   *   "update": { "x": "x + 1" },
-   *   "result": "x"
-   * }
-   */
-  @Multiline
-  private String profileTwo;
+    // create a mock
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    bolt.withMessageDistributor(distributor);
 
-  public static Tuple mockTickTuple() {
-    Tuple tuple = mock(Tuple.class);
-    when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
-    when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
-    return tuple;
-  }
+    // create a tuple
+    final long timestamp1 = 100000000L;
+    Tuple tuple1 = createTuple("entity1", message1, profile1, timestamp1);
 
-  @Before
-  public void setup() throws Exception {
-    JSONParser parser = new JSONParser();
-    messageOne = (JSONObject) parser.parse(inputOne);
-    messageTwo = (JSONObject) parser.parse(inputTwo);
+    // execute the bolt
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // the message should have been extracted from the tuple and passed to the MessageDistributor
+    verify(distributor).distribute(eq(message1), eq(timestamp1), any(MessageRoute.class), any());
   }
 
+
   /**
-   * Creates a profile definition based on a string of JSON.
-   * @param json The string of JSON.
+   * If the {@code FlushSignal} tells the bolt to flush, it should flush the {@code MessageDistributor}
+   * and emit the {@code ProfileMeasurement} values.
    */
-  private ProfileConfig createDefinition(String json) throws IOException {
-    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  @Test
+  public void testEmitWhenFlush() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a profile measurement
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withEntity("entity1")
+            .withProfileName("profile1")
+            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+            .withProfileValue(22);
+
+    // create a mock that returns the profile measurement above
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    when(distributor.flush()).thenReturn(Collections.singletonList(m));
+    bolt.withMessageDistributor(distributor);
+
+    // signal the bolt to flush
+    flushSignal.setFlushNow(true);
+
+    // execute the bolt
+    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // a profile measurement should be emitted by the bolt
+    List<ProfileMeasurement> measurements = getProfileMeasurements(outputCollector, 1);
+    assertEquals(1, measurements.size());
+    assertEquals(m, measurements.get(0));
   }
 
   /**
-   * Create a tuple that will contain the message, the entity name, and profile definition.
-   * @param entity The entity name
-   * @param message The telemetry message.
-   * @param profile The profile definition.
+   * If the {@code FlushSignal} tells the bolt NOT to flush, nothing should be emitted.
    */
-  private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile) {
-    Tuple tuple = mock(Tuple.class);
-    when(tuple.getValueByField(eq("message"))).thenReturn(message);
-    when(tuple.getValueByField(eq("entity"))).thenReturn(entity);
-    when(tuple.getValueByField(eq("profile"))).thenReturn(profile);
-    return tuple;
+  @Test
+  public void testDoNotEmitWhenNoFlush() throws Exception {
+
+    ProfileBuilderBolt bolt = createBolt();
+
+    // create a profile measurement
+    ProfileMeasurement m = new ProfileMeasurement()
+            .withEntity("entity1")
+            .withProfileName("profile1")
+            .withPeriod(1000, 500, TimeUnit.MILLISECONDS)
+            .withProfileValue(22);
+
+    // create a mock that returns the profile measurement above
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    when(distributor.flush()).thenReturn(Collections.singletonList(m));
+    bolt.withMessageDistributor(distributor);
+
+    // no flush signal
+    flushSignal.setFlushNow(false);
+
+    // execute the bolt
+    Tuple tuple1 = createTuple("entity1", message1, profile1, 1000L);
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // nothing should have been emitted
+    getProfileMeasurements(outputCollector, 0);
   }
 
   /**
-   * Create a ProfileBuilderBolt to test
+   * A {@link ProfileMeasurement} is built for each profile/entity pair.  The measurement should be emitted to each
+   * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
    */
-  private ProfileBuilderBolt createBolt() throws IOException {
+  @Test
+  public void testEmitters() throws Exception {
+
+    // defines the zk configurations accessible from the bolt
+    ProfilerConfigurations configurations = new ProfilerConfigurations();
+    configurations.updateGlobalConfig(Collections.emptyMap());
+
+    // create the bolt with 3 destinations
+    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+            .withProfileTimeToLive(30, TimeUnit.MINUTES)
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withMaxNumberOfRoutes(Long.MAX_VALUE)
+            .withZookeeperClient(client)
+            .withZookeeperCache(cache)
+            .withEmitter(new TestEmitter("destination1"))
+            .withEmitter(new TestEmitter("destination2"))
+            .withEmitter(new TestEmitter("destination3"))
+            .withProfilerConfigurations(configurations)
+            .withTumblingWindow(new BaseWindowedBolt.Duration(10, TimeUnit.MINUTES));
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 
-    ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL");
-    bolt.setCuratorFramework(client);
-    bolt.setZKCache(cache);
-    bolt.withPeriodDuration(10, TimeUnit.MINUTES);
-    bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
+    // signal the bolt to flush
+    bolt.withFlushSignal(flushSignal);
+    flushSignal.setFlushNow(true);
 
-    // define the valid destinations for the profiler
-    bolt.withDestinationHandler(new HBaseDestinationHandler());
-    bolt.withDestinationHandler(new KafkaDestinationHandler());
+    // execute the bolt
+    Tuple tuple1 = createTuple("entity", message1, profile1, System.currentTimeMillis());
+    TupleWindow window = createWindow(tuple1);
+    bolt.execute(window);
 
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
-    return bolt;
+    // validate measurements emitted to each
+    verify(outputCollector, times(1)).emit(eq("destination1"), any());
+    verify(outputCollector, times(1)).emit(eq("destination2"), any());
+    verify(outputCollector, times(1)).emit(eq("destination3"), any());
   }
 
-  /**
-   * The bolt should create a ProfileBuilder to manage a profile.
-   */
   @Test
-  public void testCreateProfileBuilder() throws Exception {
+  public void testFlushExpiredWithTick() throws Exception {
 
     ProfileBuilderBolt bolt = createBolt();
-    ProfileConfig definition = createDefinition(profileOne);
-    String entity = (String) messageOne.get("ip_src_addr");
-    Tuple tupleOne = createTuple(entity, messageOne, definition);
 
-    // execute - send two tuples with different entities
-    bolt.execute(tupleOne);
+    // create a mock
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    bolt.withMessageDistributor(distributor);
+
+    // tell the bolt to flush on the first window
+    flushSignal.setFlushNow(true);
 
-    // validate - 1 messages applied
-    MessageRoute route = new MessageRoute(definition, entity);
-    ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(route, Context.EMPTY_CONTEXT());
-    assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class));
+    // execute the bolt; include a tick tuple in the window
+    Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
+    TupleWindow tupleWindow = createWindow(tuple1, mockTickTuple());
+    bolt.execute(tupleWindow);
+
+    // ensure the expired profiles were flushed when the tick tuple was received
+    verify(distributor).flushExpired();
   }
 
-  /**
-   * This test creates two different messages, with different entities that are applied to
-   * the same profile.  The bolt should create separate ProfileBuilder objects to handle each
-   * profile/entity pair.
-   */
   @Test
-  public void testCreateProfileBuilderForEachEntity() throws Exception {
+  public void testFlushExpiredWithNoTick() throws Exception {
 
-    // setup
     ProfileBuilderBolt bolt = createBolt();
-    ProfileConfig definition = createDefinition(profileOne);
-
-    // apply a message to the profile
-    String entityOne = (String) messageOne.get("ip_src_addr");
-    Tuple tupleOne = createTuple(entityOne, messageOne, definition);
-    bolt.execute(tupleOne);
-    bolt.execute(tupleOne);
-
-    // apply a different message (with different entity) to the same profile
-    String entityTwo = (String) messageTwo.get("ip_src_addr");
-    Tuple tupleTwo = createTuple(entityTwo, messageTwo, definition);
-    bolt.execute(tupleTwo);
-
-    // validate - 2 messages applied
-    MessageRoute routeOne = new MessageRoute(definition, entityOne);
-    ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(routeOne, Context.EMPTY_CONTEXT());
-    assertTrue(builderOne.isInitialized());
-    assertEquals(2, (int) convert(builderOne.valueOf("x"), Integer.class));
-
-    // validate - 1 message applied
-    MessageRoute routeTwo = new MessageRoute(definition, entityTwo);
-    ProfileBuilder builderTwo = bolt.getMessageDistributor().getBuilder(routeTwo, Context.EMPTY_CONTEXT());
-    assertTrue(builderTwo.isInitialized());
-    assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class));
-
-    assertNotSame(builderOne, builderTwo);
+
+    // create a mock
+    MessageDistributor distributor = mock(MessageDistributor.class);
+    bolt.withMessageDistributor(distributor);
+
+    // tell the bolt to flush on the first window
+    flushSignal.setFlushNow(true);
+
+    // execute the bolt; NO tick tuple
+    Tuple tuple1 = createTuple("entity", message1, profile1, 100000000L);
+    TupleWindow tupleWindow = createWindow(tuple1);
+    bolt.execute(tupleWindow);
+
+    // there was no tick tuple; the expired profiles should NOT have been flushed
+    verify(distributor, times(0)).flushExpired();
   }
 
   /**
-   * The bolt should create separate ProfileBuilder objects to handle each
-   * profile/entity pair.
+   * Creates a mock tick tuple to use for testing.
+   * @return A mock tick tuple.
    */
-  @Test
-  public void testCreateProfileBuilderForEachProfile() throws Exception {
+  private Tuple mockTickTuple() {
 
-    // setup - apply one message to different profile definitions
-    ProfileBuilderBolt bolt = createBolt();
-    String entity = (String) messageOne.get("ip_src_addr");
-
-    // apply a message to the first profile
-    ProfileConfig definitionOne = createDefinition(profileOne);
-    Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
-    bolt.execute(tupleOne);
-
-    // apply the same message to the second profile
-    ProfileConfig definitionTwo = createDefinition(profileTwo);
-    Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo);
-    bolt.execute(tupleTwo);
-
-    // validate - 1 message applied
-    MessageRoute routeOne = new MessageRoute(definitionOne, entity);
-    ProfileBuilder builderOne = bolt.getMessageDistributor().getBuilder(routeOne, Context.EMPTY_CONTEXT());
-    assertTrue(builderOne.isInitialized());
-    assertEquals(1, (int) convert(builderOne.valueOf("x"), Integer.class));
-
-    // validate - 1 message applied
-    MessageRoute routeTwo = new MessageRoute(definitionTwo, entity);
-    ProfileBuilder builderTwo = bolt.getMessageDistributor().getBuilder(routeTwo, Context.EMPTY_CONTEXT());
-    assertTrue(builderTwo.isInitialized());
-    assertEquals(1, (int) convert(builderTwo.valueOf("x"), Integer.class));
-
-    assertNotSame(builderOne, builderTwo);
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getSourceComponent()).thenReturn("__system");
+    when(tuple.getSourceStreamId()).thenReturn("__tick");
+
+    return tuple;
   }
 
   /**
-   * A ProfileMeasurement is build for each profile/entity pair.  A measurement for each profile/entity
-   * pair should be emitted.
+   * Retrieves the ProfileMeasurement(s) (if any) that have been emitted.
+   *
+   * @param collector The Storm output collector.
+   * @param expected The number of measurements expected.
+   * @return A list of ProfileMeasurement(s).
    */
-  @Test
-  public void testEmitMeasurements() throws Exception {
-
-    // setup
-    ProfileBuilderBolt bolt = createBolt();
-    final String entity = (String) messageOne.get("ip_src_addr");
+  private List<ProfileMeasurement> getProfileMeasurements(OutputCollector collector, int expected) {
 
-    // apply the message to the first profile
-    ProfileConfig definitionOne = createDefinition(profileOne);
-    Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
-    bolt.execute(tupleOne);
+    // the 'streamId' is defined by the DestinationHandler being used by the bolt
+    final String streamId = emitter.getStreamId();
 
-    // apply the same message to the second profile
-    ProfileConfig definitionTwo = createDefinition(profileTwo);
-    Tuple tupleTwo = createTuple(entity, messageOne, definitionTwo);
-    bolt.execute(tupleTwo);
+    // capture the emitted tuple(s)
+    ArgumentCaptor<Values> argCaptor = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(expected))
+            .emit(eq(streamId), argCaptor.capture());
 
-    // execute - the tick tuple triggers a flush of the profile
-    bolt.execute(mockTickTuple());
+    // return the profile measurements that were emitted
+    return argCaptor.getAllValues()
+            .stream()
+            .map(val -> (ProfileMeasurement) val.get(0))
+            .collect(Collectors.toList());
+  }
 
-    // capture the ProfileMeasurement that should be emitted
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+  /**
+   * Create a tuple that will contain the message, the entity name, and profile definition.
+   * @param entity The entity name
+   * @param message The telemetry message.
+   * @param profile The profile definition.
+   */
+  private Tuple createTuple(String entity, JSONObject message, ProfileConfig profile, long timestamp) {
 
-    // validate emitted measurements for hbase
-    verify(outputCollector, atLeastOnce()).emit(eq("hbase"), arg.capture());
-    for (Values value : arg.getAllValues()) {
+    Tuple tuple = mock(Tuple.class);
+    when(tuple.getValueByField(eq(ProfileSplitterBolt.MESSAGE_TUPLE_FIELD))).thenReturn(message);
+    when(tuple.getValueByField(eq(ProfileSplitterBolt.TIMESTAMP_TUPLE_FIELD))).thenReturn(timestamp);
+    when(tuple.getValueByField(eq(ProfileSplitterBolt.ENTITY_TUPLE_FIELD))).thenReturn(entity);
+    when(tuple.getValueByField(eq(ProfileSplitterBolt.PROFILE_TUPLE_FIELD))).thenReturn(profile);
 
-      ProfileMeasurement measurement = (ProfileMeasurement) value.get(0);
-      ProfileConfig definition = measurement.getDefinition();
+    return tuple;
+  }
 
-      if (StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) {
+  /**
+   * Create a ProfileBuilderBolt to test.
+   * @return A {@link ProfileBuilderBolt} to test.
+   */
+  private ProfileBuilderBolt createBolt() throws IOException {
 
-        // validate measurement emitted for profile two
-        assertEquals(definitionTwo, definition);
-        assertEquals(entity, measurement.getEntity());
-        assertEquals(definitionTwo.getProfile(), measurement.getProfileName());
-        assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
+    return createBolt(30, TimeUnit.SECONDS);
+  }
 
-      } else if (StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) {
+  /**
+   * Create a ProfileBuilderBolt to test.
+   *
+   * @param windowDuration The event window duration.
+   * @param windowDurationUnits The units of the event window duration.
+   * @return A {@link ProfileBuilderBolt} to test.
+   */
+  private ProfileBuilderBolt createBolt(int windowDuration, TimeUnit windowDurationUnits) throws IOException {
+
+    // defines the zk configurations accessible from the bolt
+    ProfilerConfigurations configurations = new ProfilerConfigurations();
+    configurations.updateGlobalConfig(Collections.emptyMap());
+
+    emitter = new HBaseEmitter();
+    ProfileBuilderBolt bolt = (ProfileBuilderBolt) new ProfileBuilderBolt()
+            .withProfileTimeToLive(30, TimeUnit.MINUTES)
+            .withMaxNumberOfRoutes(Long.MAX_VALUE)
+            .withZookeeperClient(client)
+            .withZookeeperCache(cache)
+            .withEmitter(emitter)
+            .withProfilerConfigurations(configurations)
+            .withPeriodDuration(1, TimeUnit.MINUTES)
+            .withTumblingWindow(new BaseWindowedBolt.Duration(windowDuration, windowDurationUnits));
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
 
-        // validate measurement emitted for profile one
-        assertEquals(definitionOne, definition);
-        assertEquals(entity, measurement.getEntity());
-        assertEquals(definitionOne.getProfile(), measurement.getProfileName());
-        assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
+    // set the flush signal AFTER calling 'prepare'
+    bolt.withFlushSignal(flushSignal);
 
-      } else {
-        fail();
-      }
-    }
+    return bolt;
   }
 
   /**
-   * A ProfileMeasurement is build for each profile/entity pair.  The measurement should be emitted to each
-   * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
+   * Creates a mock TupleWindow containing multiple tuples.
+   * @param tuples The tuples to add to the window.
    */
-  @Test
-  public void testDestinationHandlers() throws Exception {
+  private TupleWindow createWindow(Tuple... tuples) {
 
-    // setup
-    ProfileBuilderBolt bolt = createBolt();
-    ProfileConfig definitionOne = createDefinition(profileOne);
+    TupleWindow window = mock(TupleWindow.class);
+    when(window.get()).thenReturn(Arrays.asList(tuples));
+    return window;
+  }
 
-    // apply the message to the first profile
-    final String entity = (String) messageOne.get("ip_src_addr");
-    Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
-    bolt.execute(tupleOne);
+  /**
+   * An implementation for testing purposes only.
+   */
+  private class TestEmitter implements ProfileMeasurementEmitter {
 
-    // trigger a flush of the profile
-    bolt.execute(mockTickTuple());
+    private String streamId;
 
-    // capture the values that should be emitted
-    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    public TestEmitter(String streamId) {
+      this.streamId = streamId;
+    }
 
-    // validate measurements emitted to HBase
-    verify(outputCollector, times(1)).emit(eq("hbase"), arg.capture());
-    assertTrue(arg.getValue().get(0) instanceof ProfileMeasurement);
+    @Override
+    public String getStreamId() {
+      return streamId;
+    }
 
-    // validate measurements emitted to Kafka
-    verify(outputCollector, times(1)).emit(eq("kafka"), arg.capture());
-    assertTrue(arg.getValue().get(0) instanceof JSONObject);
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declareStream(getStreamId(), new Fields("measurement"));
+    }
+
+    @Override
+    public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+      collector.emit(getStreamId(), new Values(measurement));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
index 17d6827..04c774c 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -20,11 +20,11 @@
 
 package org.apache.metron.profiler.bolt;
 
-import org.apache.metron.common.configuration.profiler.ProfileResult;
-import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfileResult;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.storm.tuple.Tuple;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -32,10 +32,8 @@ import org.junit.Test;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index beab8d5..bf81923 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -21,7 +21,10 @@
 package org.apache.metron.profiler.bolt;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.profiler.clock.FixedClockFactory;
+import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
@@ -31,12 +34,15 @@ import org.json.simple.parser.ParseException;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 import java.util.HashMap;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.refEq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests the ProfileSplitterBolt.
@@ -47,7 +53,9 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    * {
    *   "ip_src_addr": "10.0.0.1",
    *   "ip_dst_addr": "10.0.0.20",
-   *   "protocol": "HTTP"
+   *   "protocol": "HTTP",
+   *   "timestamp.custom": 2222222222222,
+   *   "timestamp.string": "3333333333333"
    * }
    */
   @Multiline
@@ -68,7 +76,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    * }
    */
   @Multiline
-  private String onlyIfTrue;
+  private String profileWithOnlyIfTrue;
 
   /**
    * {
@@ -85,7 +93,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    * }
    */
   @Multiline
-  private String onlyIfFalse;
+  private String profileWithOnlyIfFalse;
 
   /**
    * {
@@ -101,7 +109,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    * }
    */
   @Multiline
-  private String onlyIfMissing;
+  private String profileWithOnlyIfMissing;
 
   /**
    * {
@@ -118,9 +126,89 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    * }
    */
   @Multiline
-  private String onlyIfInvalid;
+  private String profileWithOnlyIfInvalid;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp.custom"
+   * }
+   */
+  @Multiline
+  private String profileUsingCustomTimestampField;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp.missing"
+   * }
+   */
+  @Multiline
+  private String profileUsingMissingTimestampField;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "test",
+   *        "foreach": "ip_src_addr",
+   *        "init": {},
+   *        "update": {},
+   *        "result": "2"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp.string"
+   * }
+   */
+  @Multiline
+  private String profileUsingStringTimestampField;
+
+  /**
+   * {
+   *   "profiles": [
+   *   ]
+   * }
+   */
+  @Multiline
+  private String noProfilesDefined;
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "'global'",
+   *        "result": "1"
+   *      },
+   *      {
+   *        "profile": "profile2",
+   *        "foreach": "'global'",
+   *        "result": "2"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String twoProfilesDefined;
 
   private JSONObject message;
+  private long timestamp = 3333333;
 
   @Before
   public void setup() throws ParseException {
@@ -134,17 +222,83 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
   }
 
   /**
-   * Create a ProfileSplitterBolt to test
+   * Ensure that a tuple with the correct fields is emitted to downstream bolts
+   * when a profile is defined.
    */
-  private ProfileSplitterBolt createBolt(String profilerConfig) throws IOException {
+  @Test
+  public void testEmitTupleWithOneProfile() throws Exception {
 
-    ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
-    bolt.setCuratorFramework(client);
-    bolt.setZKCache(cache);
-    bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8"));
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+    // setup the bolt and execute a tuple
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
 
-    return bolt;
+    // the expected tuple fields
+    String expectedEntity = "10.0.0.1";
+    ProfileConfig expectedConfig = config.getProfiles().get(0);
+    Values expected = new Values(message, timestamp, expectedEntity, expectedConfig);
+
+    // a tuple should be emitted for the downstream profile builder
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), eq(expected));
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * If there are two profiles that need the same message, then two tuples should
+   * be emitted.  One tuple for each profile.
+   */
+  @Test
+  public void testEmitTupleWithTwoProfiles() throws Exception {
+
+    // setup the bolt and execute a tuple
+    ProfilerConfig config = toProfilerConfig(twoProfilesDefined);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // the expected tuple fields
+    final String expectedEntity = "global";
+    {
+      // a tuple should be emitted for the first profile
+      ProfileConfig profile1 = config.getProfiles().get(0);
+      Values expected = new Values(message, timestamp, expectedEntity, profile1);
+      verify(outputCollector, times(1))
+              .emit(eq(tuple), eq(expected));
+    }
+    {
+      // a tuple should be emitted for the second profile
+      ProfileConfig profile2 = config.getProfiles().get(1);
+      Values expected = new Values(message, timestamp, expectedEntity, profile2);
+      verify(outputCollector, times(1))
+              .emit(eq(tuple), eq(expected));
+    }
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
+  }
+
+  /**
+   * No tuples should be emitted, if no profiles are defined.
+   */
+  @Test
+  public void testNoProfilesDefined() throws Exception {
+
+    // setup the bolt and execute a tuple
+    ProfilerConfig config = toProfilerConfig(noProfilesDefined);
+    ProfileSplitterBolt bolt = createBolt(config);
+    bolt.execute(tuple);
+
+    // no tuple should be emitted
+    verify(outputCollector, times(0))
+            .emit(any(Tuple.class), any());
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
   }
 
   /**
@@ -154,17 +308,17 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
   @Test
   public void testOnlyIfTrue() throws Exception {
 
-    // setup
-    ProfileSplitterBolt bolt = createBolt(onlyIfTrue);
-
-    // execute
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+    ProfileSplitterBolt bolt = createBolt(config);
     bolt.execute(tuple);
 
     // a tuple should be emitted for the downstream profile builder
-    verify(outputCollector, times(1)).emit(refEq(tuple), any(Values.class));
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), any(Values.class));
 
     // the original tuple should be ack'd
-    verify(outputCollector, times(1)).ack(tuple);
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
   }
 
   /**
@@ -174,17 +328,17 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
   @Test
   public void testOnlyIfMissing() throws Exception {
 
-    // setup
-    ProfileSplitterBolt bolt = createBolt(onlyIfMissing);
-
-    // execute
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfMissing);
+    ProfileSplitterBolt bolt = createBolt(config);
     bolt.execute(tuple);
 
     // a tuple should be emitted for the downstream profile builder
-    verify(outputCollector, times(1)).emit(refEq(tuple), any(Values.class));
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), any(Values.class));
 
     // the original tuple should be ack'd
-    verify(outputCollector, times(1)).ack(tuple);
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
   }
 
   /**
@@ -194,36 +348,45 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
   @Test
   public void testOnlyIfFalse() throws Exception {
 
-    // setup
-    ProfileSplitterBolt bolt = createBolt(onlyIfFalse);
-
-    // execute
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfFalse);
+    ProfileSplitterBolt bolt = createBolt(config);
     bolt.execute(tuple);
 
     // a tuple should NOT be emitted for the downstream profile builder
-    verify(outputCollector, times(0)).emit(any(Values.class));
+    verify(outputCollector, times(0))
+            .emit(any());
 
     // the original tuple should be ack'd
-    verify(outputCollector, times(1)).ack(tuple);
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
   }
 
   /**
-   * The entity associated with a ProfileMeasurement can be defined using a variable that is resolved
-   * via Stella.  In this case the entity is defined as 'ip_src_addr' which is resolved to
-   * '10.0.0.1' based on the data contained within the message.
+   * The entity associated with a profile is defined with a Stellar expression.  That expression
+   * can refer to any field within the message.
+   *
+   * In this case the entity is defined as 'ip_src_addr' which is resolved to '10.0.0.1' based on
+   * the data contained within the message.
    */
   @Test
   public void testResolveEntityName() throws Exception {
 
-    // setup
-    ProfileSplitterBolt bolt = createBolt(onlyIfTrue);
-
-    // execute
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfTrue);
+    ProfileSplitterBolt bolt = createBolt(config);
     bolt.execute(tuple);
 
-    // verify - the entity name comes from variable resolution in stella
+    // expected values
     String expectedEntity = "10.0.0.1";
-    verify(outputCollector, times(1)).emit(any(Tuple.class), refEq(new Values(expectedEntity, onlyIfTrue, message)));
+    ProfileConfig expectedConfig = config.getProfiles().get(0);
+    Values expected = new Values(message, timestamp, expectedEntity, expectedConfig);
+
+    // a tuple should be emitted for the downstream profile builder
+    verify(outputCollector, times(1))
+            .emit(eq(tuple), eq(expected));
+
+    // the original tuple should be ack'd
+    verify(outputCollector, times(1))
+            .ack(eq(tuple));
   }
 
   /**
@@ -232,11 +395,42 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
   @Test
   public void testOnlyIfInvalid() throws Exception {
 
-    // setup
-    ProfileSplitterBolt bolt = createBolt(onlyIfInvalid);
+    ProfilerConfig config = toProfilerConfig(profileWithOnlyIfInvalid);
+    ProfileSplitterBolt bolt = createBolt(config);
     bolt.execute(tuple);
 
     // a tuple should NOT be emitted for the downstream profile builder
-    verify(outputCollector, times(0)).emit(any(Values.class));
+    verify(outputCollector, times(0))
+            .emit(any(Values.class));
+  }
+
+  /**
+   * Creates a ProfilerConfig based on a string containing JSON.
+   *
+   * @param configAsJSON The config as JSON.
+   * @return The ProfilerConfig.
+   * @throws Exception
+   */
+  private ProfilerConfig toProfilerConfig(String configAsJSON) throws Exception {
+    InputStream in = new ByteArrayInputStream(configAsJSON.getBytes("UTF-8"));
+    return JSONUtils.INSTANCE.load(in, ProfilerConfig.class);
   }
+
+  /**
+   * Create a ProfileSplitterBolt to test
+   */
+  private ProfileSplitterBolt createBolt(ProfilerConfig config) throws Exception {
+
+    ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL");
+    bolt.setCuratorFramework(client);
+    bolt.setZKCache(cache);
+    bolt.getConfigurations().updateProfilerConfig(config);
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+
+    // set the clock factory AFTER calling prepare to use the fixed clock factory
+    bolt.setClockFactory(new FixedClockFactory(timestamp));
+
+    return bolt;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
new file mode 100644
index 0000000..7e1628b
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/MessageBuilder.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.integration;
+
+import org.json.simple.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Enables simple creation of telemetry messages for testing.
+ */
+public class MessageBuilder {
+
+  private Map<Object, Object> fields;
+
+  /**
+   * Create a new {@link MessageBuilder}.
+   */
+  public MessageBuilder() {
+    this.fields = new HashMap<>();
+  }
+
+  /**
+   * Adds all of the fields from a message to this message.
+   *
+   * @param prototype The other message that is treated as a prototype.
+   * @return A {@link MessageBuilder}
+   */
+  public MessageBuilder withFields(JSONObject prototype) {
+    prototype.forEach((key, val) -> this.fields.put(key, val));
+    return this;
+  }
+
+  /**
+   * Adds a field to the message.
+   *
+   * @param key The field name.
+   * @param value The field value.
+   * @return A {@link MessageBuilder}
+   */
+  public MessageBuilder withField(String key, Object value) {
+    this.fields.put(key, value);
+    return this;
+  }
+
+  /**
+   * Build the message.
+   *
+   * <p>This should be called after defining all of the message fields.
+   *
+   * @return A {@link MessageBuilder}.
+   */
+  public JSONObject build() {
+    return new JSONObject(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 0d1b465..c48a3e9 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -28,15 +28,18 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
 import org.apache.metron.statistics.OnlineStatisticsProvider;
 import org.junit.After;
@@ -49,15 +52,15 @@ import org.junit.Test;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.code.tempusfugit.temporal.Duration.seconds;
 import static com.google.code.tempusfugit.temporal.Timeout.timeout;
 import static com.google.code.tempusfugit.temporal.WaitFor.waitOrTimeout;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  * An integration test of the Profiler topology.
@@ -105,7 +108,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static FluxTopologyComponent fluxComponent;
   private static KafkaComponent kafkaComponent;
   private static ConfigUploadComponent configUploadComponent;
-  private static List<byte[]> input;
   private static ComponentRunner runner;
   private static MockHTable profilerTable;
 
@@ -114,7 +116,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static final double epsilon = 0.001;
   private static final String inputTopic = Constants.INDEXING_TOPIC;
   private static final String outputTopic = "profiles";
+  private static final int saltDivisor = 10;
 
+  private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(5);
+  private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(5);
+  private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(15);
+  private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
+  private static final long maxRoutesPerBolt = 100000;
 
   /**
    * Tests the first example contained within the README.
@@ -122,22 +130,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   @Test
   public void testExample1() throws Exception {
 
-    update(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-1");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, input);
+    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
-            timeout(seconds(90)));
+            timeout(seconds(180)));
 
     // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+            columnBuilder.getColumnQualifier("value"), Double.class);
 
-    // verify - there are 5 'HTTP' each with 390 bytes
+    // verify - there are 3 'HTTP' each with 390 bytes
     Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(390.0 * 5, val, epsilon)
+            MathUtils.equals(390.0 * 3, val, epsilon)
     ));
   }
 
@@ -147,11 +158,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   @Test
   public void testExample2() throws Exception {
 
-    update(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-2");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, input);
+    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
 
     // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
     final int expected = 2;
@@ -161,16 +174,17 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
             timeout(seconds(90)));
 
     // verify - expect 2 results as 2 hosts involved; 10.0.0.2 sends 'HTTP' and 10.0.0.3 send 'DNS'
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+            columnBuilder.getColumnQualifier("value"), Double.class);
 
-    // verify - 10.0.0.3 -> 1/6
-    Assert.assertTrue( "Could not find a value near 1/6. Actual values read are are: " + Joiner.on(",").join(actuals)
-                     , actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/6.0, epsilon)
+    // verify - 10.0.0.3 -> 1/4
+    Assert.assertTrue( "Could not find a value near 1/4. Actual values read are are: " + Joiner.on(",").join(actuals),
+            actuals.stream().anyMatch(val -> MathUtils.equals(val, 1.0/4.0, epsilon)
     ));
 
-    // verify - 10.0.0.2 -> 6/1
-    Assert.assertTrue("Could not find a value near 6. Actual values read are are: " + Joiner.on(",").join(actuals)
-            ,actuals.stream().anyMatch(val -> MathUtils.equals(val, 6.0/1.0, epsilon)
+    // verify - 10.0.0.2 -> 4/1
+    Assert.assertTrue("Could not find a value near 4. Actual values read are are: " + Joiner.on(",").join(actuals),
+            actuals.stream().anyMatch(val -> MathUtils.equals(val, 4.0/1.0, epsilon)
     ));
   }
 
@@ -180,22 +194,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   @Test
   public void testExample3() throws Exception {
 
-    update(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-3");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, input);
+    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
             timeout(seconds(90)));
 
     // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+            columnBuilder.getColumnQualifier("value"), Double.class);
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals)
-                     , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
+    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
+            actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)
     ));
   }
 
@@ -205,11 +222,13 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   @Test
   public void testExample4() throws Exception {
 
-    update(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, input);
+    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -220,34 +239,109 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class);
 
     // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
-    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals)
-                     , actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon)
+    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
+            actuals.stream().anyMatch(val -> MathUtils.equals(val.getMean(), 20.0, epsilon)
     ));
   }
 
   @Test
   public void testPercentiles() throws Exception {
 
-    update(TEST_RESOURCES + "/config/zookeeper/percentiles");
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/percentiles");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(inputTopic, message1, message1, message1);
+    kafkaComponent.writeMessages(inputTopic, message2, message2, message2);
+    kafkaComponent.writeMessages(inputTopic, message3, message3, message3);
+
+    // verify - ensure the profile is being persisted
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
+            timeout(seconds(90)));
+
+    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily,
+            columnBuilder.getColumnQualifier("value"), Double.class);
 
+    // verify - the 70th percentile of x3, 20s = 20.0
+    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals),
+            actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)));
+  }
+
+  /**
+   * The Profiler can optionally perform event time processing.  With event time processing,
+   * the Profiler uses timestamps contained in the source telemetry.
+   *
+   * <p>Defining a 'timestampField' within the Profiler configuration tells the Profiler
+   * from which field the timestamp should be extracted.
+   */
+  @Test
+  public void testEventTimeProcessing() throws Exception {
+
+    // constants used for the test
+    final long startAt = 10;
+    final String entity = "10.0.0.1";
+    final String profileName = "event-time-test";
+
+    // create some messages that contain a timestamp - a really old timestamp; close to 1970
+    String message1 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt)
+            .build()
+            .toJSONString();
+
+    String message2 = new MessageBuilder()
+            .withField("ip_src_addr", entity)
+            .withField("timestamp", startAt + 100)
+            .build()
+            .toJSONString();
+
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/event-time-test");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(inputTopic, input);
+    kafkaComponent.writeMessages(inputTopic, message1, message2);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
             timeout(seconds(90)));
 
-    List<Double> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Double.class);
+    List<Put> puts = profilerTable.getPutLog();
+    assertEquals(1, puts.size());
+
+    // inspect the row key to ensure the profiler used event time correctly.  the timestamp
+    // embedded in the row key should match those in the source telemetry
+    byte[] expectedRowKey = generateExpectedRowKey(profileName, entity, startAt);
+    byte[] actualRowKey = puts.get(0).getRow();
+    String msg = String.format("expected '%s', got '%s'",
+            new String(expectedRowKey, "UTF-8"),
+            new String(actualRowKey, "UTF-8"));
+    assertArrayEquals(msg, expectedRowKey, actualRowKey);
+  }
 
-    // verify - the 70th percentile of 5 x 20s = 20.0
-    Assert.assertTrue("Could not find a value near 20. Actual values read are are: " + Joiner.on(",").join(actuals)
-                     , actuals.stream().anyMatch(val -> MathUtils.equals(val, 20.0, epsilon)));
+  /**
+   * Generates the expected row key.
+   *
+   * @param profileName The name of the profile.
+   * @param entity The entity.
+   * @param whenMillis A timestamp in epoch milliseconds.
+   * @return A row key.
+   */
+  private byte[] generateExpectedRowKey(String profileName, String entity, long whenMillis) {
+
+    // only the profile name, entity, and period are used to generate the row key
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName(profileName)
+            .withEntity(entity)
+            .withPeriod(whenMillis, periodDurationMillis, TimeUnit.MILLISECONDS);
+
+    // build the row key
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS);
+    return rowKeyBuilder.rowKey(measurement);
   }
 
   /**
    * Reads a value written by the Profiler.
+   *
    * @param family The column family.
    * @param qualifier The column qualifier.
    * @param clazz The expected type of the value.
@@ -258,7 +352,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     List<T> results = new ArrayList<>();
 
     for(Put put: puts) {
-      for(Cell cell: put.get(Bytes.toBytes(family), qualifier)) {
+      List<Cell> cells = put.get(Bytes.toBytes(family), qualifier);
+      for(Cell cell : cells) {
         T value = SerDeUtils.fromBytes(cell.getValue(), clazz);
         results.add(value);
       }
@@ -271,39 +366,41 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   public static void setupBeforeClass() throws UnableToStartException {
     columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
 
-    List<String> inputNew = Stream.of(message1, message2, message3)
-        .map(m -> Collections.nCopies(5, m))
-        .flatMap(l -> l.stream())
-        .collect(Collectors.toList());
-
-    // create input messages for the profiler to consume
-    input = Stream.of(message1, message2, message3)
-            .map(Bytes::toBytes)
-            .map(m -> Collections.nCopies(5, m))
-            .flatMap(l -> l.stream())
-            .collect(Collectors.toList());
-
     // storm topology properties
     final Properties topologyProperties = new Properties() {{
-      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+
+      // storm settings
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
+      setProperty("storm.auto.credentials", "[]");
+      setProperty("topology.auto-credentials", "[]");
+      setProperty("topology.message.timeout.secs", "60");
+      setProperty("topology.max.spout.pending", "100000");
+
+      // kafka settings
       setProperty("profiler.input.topic", inputTopic);
       setProperty("profiler.output.topic", outputTopic);
-      setProperty("profiler.period.duration", "20");
-      setProperty("profiler.period.duration.units", "SECONDS");
-      setProperty("profiler.ttl", "30");
-      setProperty("profiler.ttl.units", "MINUTES");
-      setProperty("profiler.hbase.salt.divisor", "10");
+      setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+      setProperty("kafka.security.protocol", "PLAINTEXT");
+
+      // hbase settings
+      setProperty("profiler.hbase.salt.divisor", Integer.toString(saltDivisor));
       setProperty("profiler.hbase.table", tableName);
       setProperty("profiler.hbase.column.family", columnFamily);
       setProperty("profiler.hbase.batch", "10");
       setProperty("profiler.hbase.flush.interval.seconds", "1");
-      setProperty("profiler.profile.ttl", "20");
       setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName());
-      setProperty("storm.auto.credentials", "[]");
-      setProperty("kafka.security.protocol", "PLAINTEXT");
-      setProperty("topology.auto-credentials", "[]");
+
+      // profile settings
+      setProperty("profiler.period.duration", Long.toString(periodDurationMillis));
+      setProperty("profiler.period.duration.units", "MILLISECONDS");
+      setProperty("profiler.ttl", Long.toString(profileTimeToLiveMillis));
+      setProperty("profiler.ttl.units", "MILLISECONDS");
+      setProperty("profiler.window.duration", Long.toString(windowDurationMillis));
+      setProperty("profiler.window.duration.units", "MILLISECONDS");
+      setProperty("profiler.window.lag", Long.toString(windowLagMillis));
+      setProperty("profiler.window.lag.units", "MILLISECONDS");
+      setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt));
     }};
 
     // create the mock table
@@ -311,7 +408,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     zkComponent = getZKServerComponent(topologyProperties);
 
-    // create the input topic
+    // create the input and output topics
     kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
             new KafkaComponent.Topic(inputTopic, 1),
             new KafkaComponent.Topic(outputTopic, 1)));
@@ -340,12 +437,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     runner.start();
   }
 
-  public void update(String path) throws Exception {
-    configUploadComponent.withGlobalConfiguration(path)
-        .withProfilerConfiguration(path);
-    configUploadComponent.update();
-  }
-
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     MockHBaseTableProvider.clear();
@@ -368,4 +459,16 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       runner.reset();
     }
   }
-}
\ No newline at end of file
+
+  /**
+   * Uploads config values to Zookeeper.
+   * @param path The path on the local filesystem to the config values.
+   * @throws Exception
+   */
+  public void uploadConfig(String path) throws Exception {
+    configUploadComponent
+            .withGlobalConfiguration(path)
+            .withProfilerConfiguration(path)
+            .update();
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
index c7f6ce2..8546b56 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-profiler-env.xml
@@ -57,9 +57,33 @@
       <type>value-list</type>
       <entries>
         <entry>
-          <value>DAYS</value>
+          <value>HOURS</value>
         </entry>
         <entry>
+          <value>MINUTES</value>
+        </entry>
+        <entry>
+          <value>SECONDS</value>
+        </entry>
+      </entries>
+      <selection-cardinality>1</selection-cardinality>
+    </value-attributes>
+  </property>
+  <property>
+    <name>profiler_window_duration</name>
+    <value>30</value>
+    <description>The duration of each profile window. This value should be defined along with profiler.window.duration.units</description>
+    <display-name>Window Duration</display-name>
+  </property>
+  <property>
+    <name>profiler_window_units</name>
+    <value>SECONDS</value>
+    <description>The units used to specify the profiler.window.duration. This value should be defined along with profiler.window.duration.</description>
+    <display-name>Window Units</display-name>
+    <value-attributes>
+      <type>value-list</type>
+      <entries>
+        <entry>
           <value>HOURS</value>
         </entry>
         <entry>
@@ -71,7 +95,6 @@
       </entries>
       <selection-cardinality>1</selection-cardinality>
     </value-attributes>
-
   </property>
   <property>
     <name>profiler_ttl</name>
@@ -104,8 +127,54 @@
       </entries>
       <selection-cardinality>1</selection-cardinality>
     </value-attributes>
-
-
+  </property>
+  <property>
+    <name>profiler_window_lag</name>
+    <value>1</value>
+    <description>The maximum time lag for timestamps. Timestamps cannot arrive out-of-order by more than this amount.</description>
+    <display-name>Window Time Lag</display-name>
+  </property>
+  <property>
+    <name>profiler_window_lag_units</name>
+    <value>MINUTES</value>
+    <description>The units used to specify the Event Time Lag.</description>
+    <display-name>Window Lag Units</display-name>
+    <value-attributes>
+      <type>value-list</type>
+      <entries>
+        <entry>
+          <value>HOURS</value>
+        </entry>
+        <entry>
+          <value>MINUTES</value>
+        </entry>
+        <entry>
+          <value>SECONDS</value>
+        </entry>
+      </entries>
+      <selection-cardinality>1</selection-cardinality>
+    </value-attributes>
+  </property>
+  <property>
+    <name>profiler_topology_message_timeout_secs</name>
+    <description>The maximum amount of time a message has to complete before it is considered failed.</description>
+    <display-name>Profiler Topology Message Timeout</display-name>
+    <value>900</value>
+  </property>
+  <property>
+    <name>profiler_topology_max_spout_pending</name>
+    <description>Profiler Topology Spout Max Pending Tuples</description>
+    <display-name>Spout Max Pending Tuples</display-name>
+    <value/>
+    <value-attributes>
+        <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+  </property>
+  <property>
+    <name>profiler_max_routes_per_bolt</name>
+    <value>100000</value>
+    <description>The max number of routes allowed per bolt. The number of routes increases as the number of profiles and entities increases.</description>
+    <display-name>Max Routes Per Bolt</display-name>
   </property>
   <property>
     <name>profiler_hbase_table</name>