You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/03/02 22:02:13 UTC

[2/2] incubator-metron git commit: METRON-701 Triage Metrics Produced by the Profiler (nickwallen) closes apache/incubator-metron#449

METRON-701 Triage Metrics Produced by the Profiler (nickwallen) closes apache/incubator-metron#449


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/818b0b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/818b0b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/818b0b17

Branch: refs/heads/master
Commit: 818b0b17b131d875259178e61e59d32d02ae792a
Parents: e662849
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Mar 2 17:01:41 2017 -0500
Committer: Nick Allen <ni...@nickallen.org>
Committed: Thu Mar 2 17:01:41 2017 -0500

----------------------------------------------------------------------
 dependencies_with_url.csv                       |   1 +
 .../metron/profiler/client/ProfileWriter.java   |   4 +-
 .../apache/metron/profiler/ProfileBuilder.java  |  34 ++-
 .../metron/profiler/ProfileMeasurement.java     | 109 ++++++----
 .../apache/metron/profiler/ProfilePeriod.java   |   7 +
 .../profiler/hbase/ValueOnlyColumnBuilder.java  |   2 +-
 .../metron/profiler/ProfileBuilderTest.java     |  93 ++++++++-
 .../profiler/hbase/SaltyRowKeyBuilderTest.java  |   2 +-
 metron-analytics/metron-profiler/README.md      |  88 +++++---
 metron-analytics/metron-profiler/pom.xml        |  24 +++
 .../src/main/config/profiler.properties         |   2 +
 .../src/main/flux/profiler/remote.yaml          |  45 +++-
 .../profiler/bolt/DestinationHandler.java       |  56 +++++
 .../profiler/bolt/HBaseDestinationHandler.java  |  58 ++++++
 .../profiler/bolt/KafkaDestinationHandler.java  | 110 ++++++++++
 .../profiler/bolt/ProfileBuilderBolt.java       | 100 +++++----
 .../profiler/bolt/ProfileHBaseMapper.java       |   3 +-
 .../profiler/bolt/ProfileSplitterBolt.java      |   3 +-
 .../zookeeper/readme-example-4/profiler.json    |  11 +
 .../zookeeper/write-integer/profiler.json       |  11 -
 .../bolt/KafkaDestinationHandlerTest.java       | 203 ++++++++++++++++++
 .../profiler/bolt/ProfileBuilderBoltTest.java   |  64 ++++--
 .../profiler/bolt/ProfileHBaseMapperTest.java   |  19 +-
 .../profiler/bolt/ProfileSplitterBoltTest.java  |   8 +-
 .../integration/ProfilerIntegrationTest.java    |  45 ++--
 .../configuration/profiler/ProfileConfig.java   |  71 ++++---
 .../configuration/profiler/ProfileResult.java   |  99 +++++++++
 .../profiler/ProfileResultExpressions.java      |  57 +++++
 .../profiler/ProfileTriageExpressions.java      |  67 ++++++
 .../configuration/profiler/ProfilerConfig.java  |   2 +-
 .../profiler/ProfilerConfigurations.java        |   2 +-
 .../apache/metron/common/utils/JSONUtils.java   |  42 ++--
 .../profiler/ProfileConfigTest.java             | 207 +++++++++++++++++++
 33 files changed, 1404 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 1d428c3..819ab84 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -12,6 +12,7 @@ org.fusesource.jansi:jansi:jar:1.11:compile,Apache v2,https://github.com/fusesou
 de.javakaffee:kryo-serializers:jar:0.38:compile,Apache v2,https://github.com/magro/kryo-serializers
 com.tdunning:t-digest:jar:3.1:compile,Apache v2,https://github.com/tdunning/t-digest
 com.esotericsoftware:kryo:jar:3.0.3:compile,New BSD License,http://code.google.com/p/kryo
+com.esotericsoftware:kryo-shaded:jar:3.0.3:compile,New BSD License,http://code.google.com/p/kryo
 com.esotericsoftware.kryo:kryo:jar:2.21:compile,New BSD License,http://code.google.com/p/kryo/
 com.esotericsoftware.minlog:minlog:jar:1.2:compile,New BSD License,http://code.google.com/p/minlog/
 com.esotericsoftware.minlog:minlog:jar:1.3.0:compile,New BSD License,http://code.google.com/p/minlog/

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index 6e2b11e..317227b 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -71,7 +71,7 @@ public class ProfileWriter {
     for(int i=0; i<count; i++) {
 
       // generate the next value that should be written
-      Object nextValue = valueGenerator.apply(m.getValue());
+      Object nextValue = valueGenerator.apply(m.getProfileValue());
 
       // create a measurement for the next profile period to be written
       ProfilePeriod next = m.getPeriod().next();
@@ -80,7 +80,7 @@ public class ProfileWriter {
               .withEntity(prototype.getEntity())
               .withPeriod(next.getStartTimeMillis(), prototype.getPeriod().getDurationMillis(), TimeUnit.MILLISECONDS)
               .withGroups(group)
-              .withValue(nextValue);
+              .withProfileValue(nextValue);
 
       write(m);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
index 4c38fac..b444ba1 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfileResult;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.common.dsl.StellarFunctions;
@@ -40,9 +41,11 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 
@@ -140,26 +143,36 @@ public class ProfileBuilder implements Serializable {
    *
    * Completes and emits the ProfileMeasurement.  Clears all state in preparation for
    * the next window period.
+   *
    * @return Returns the completed profile measurement.
    */
   public ProfileMeasurement flush() {
     LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
 
-    // execute the 'result' expression
+    // execute the 'profile' expression(s)
     @SuppressWarnings("unchecked")
-    Object value = execute(definition.getResult(), new JSONObject(), "result");
+    Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile");
+
+    // execute the 'triage' expression(s)
+    Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                    e -> e.getKey(),
+                    e -> execute(e.getValue(), "result/triage")));
 
     // execute the 'groupBy' expression(s) - can refer to value of 'result' expression
-    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", value), "groupBy");
+    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy");
 
     isInitialized = false;
-
     return new ProfileMeasurement()
             .withProfileName(profileName)
             .withEntity(entity)
             .withGroups(groups)
             .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS)
-            .withValue(value);
+            .withProfileValue(profileValue)
+            .withTriageValues(triageValues)
+            .withDefinition(definition);
   }
 
   /**
@@ -181,6 +194,17 @@ public class ProfileBuilder implements Serializable {
   }
 
   /**
+   * Executes an expression contained within the profile definition.
+   * @param expression The expression to execute.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing the expression.
+   */
+  private Object execute(String expression, String expressionType) {
+    return execute(expression, Collections.emptyMap(), expressionType);
+  }
+
+
+  /**
    * Executes a set of expressions whose results need to be assigned to a variable.
    * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
    * @param transientState Additional transient state provided to the expression.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index bbd17a5..e9ac945 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -20,8 +20,11 @@
 
 package org.apache.metron.profiler;
 
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,11 +46,6 @@ public class ProfileMeasurement {
   private String entity;
 
   /**
-   * The actual measurement itself.
-   */
-  private Object value;
-
-  /**
    * The 'groups' used to sort the Profile data. The groups are the result of
    * executing the Profile's 'groupBy' expression.
    */
@@ -58,6 +56,24 @@ public class ProfileMeasurement {
    */
   private ProfilePeriod period;
 
+  /**
+   * The profile definition that resulted in this measurement.
+   */
+  private ProfileConfig definition;
+
+  /**
+   * The result of evaluating the profile expression.
+   */
+  private Object profileValue;
+
+  /**
+   * The result of evaluating the triage expression(s).
+   *
+   * A profile can generate one or more values that can be used during the
+   * threat triage process.  Each value is given a unique name.
+   */
+  private Map<String, Object> triageValues;
+
   public ProfileMeasurement() {
     this.groups = Collections.emptyList();
   }
@@ -72,11 +88,6 @@ public class ProfileMeasurement {
     return this;
   }
 
-  public ProfileMeasurement withValue(Object value) {
-    this.value = value;
-    return this;
-  }
-
   public ProfileMeasurement withGroups(List<Object> groups) {
     this.groups = groups;
     return this;
@@ -87,58 +98,74 @@ public class ProfileMeasurement {
     return this;
   }
 
+  public ProfileMeasurement withDefinition(ProfileConfig definition) {
+    this.definition = definition;
+    return this;
+  }
+
+  public ProfileMeasurement withProfileValue(Object profileValue) {
+    this.profileValue = profileValue;
+    return this;
+  }
+
+  public ProfileMeasurement withTriageValues(Map<String, Object> triageValues) {
+    this.triageValues = triageValues;
+    return this;
+  }
+
   public String getProfileName() {
     return profileName;
   }
 
+  public void setProfileName(String profileName) {
+    this.profileName = profileName;
+  }
+
   public String getEntity() {
     return entity;
   }
 
-  public Object getValue() {
-    return value;
+  public void setEntity(String entity) {
+    this.entity = entity;
+  }
+
+  public List<Object> getGroups() {
+    return groups;
+  }
+
+  public void setGroups(List<Object> groups) {
+    this.groups = groups;
   }
 
   public ProfilePeriod getPeriod() {
     return period;
   }
 
-  public List<Object> getGroups() {
-    return groups;
+  public void setPeriod(ProfilePeriod period) {
+    this.period = period;
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+  public ProfileConfig getDefinition() {
+    return definition;
+  }
 
-    ProfileMeasurement that = (ProfileMeasurement) o;
+  public void setDefinition(ProfileConfig definition) {
+    this.definition = definition;
+  }
+
+  public Object getProfileValue() {
+    return profileValue;
+  }
 
-    if (profileName != null ? !profileName.equals(that.profileName) : that.profileName != null) return false;
-    if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
-    if (value != null ? !value.equals(that.value) : that.value != null) return false;
-    if (groups != null ? !groups.equals(that.groups) : that.groups != null) return false;
-    return period != null ? period.equals(that.period) : that.period == null;
+  public void setProfileValue(Object profileValue) {
+    this.profileValue = profileValue;
   }
 
-  @Override
-  public int hashCode() {
-    int result = profileName != null ? profileName.hashCode() : 0;
-    result = 31 * result + (entity != null ? entity.hashCode() : 0);
-    result = 31 * result + (value != null ? value.hashCode() : 0);
-    result = 31 * result + (groups != null ? groups.hashCode() : 0);
-    result = 31 * result + (period != null ? period.hashCode() : 0);
-    return result;
+  public Map<String, Object> getTriageValues() {
+    return triageValues;
   }
 
-  @Override
-  public String toString() {
-    return "ProfileMeasurement{" +
-            "profileName='" + profileName + '\'' +
-            ", entity='" + entity + '\'' +
-            ", value=" + value +
-            ", groups=" + groups +
-            ", period=" + period +
-            '}';
+  public void setTriageValues(Map<String, Object> triageValues) {
+    this.triageValues = triageValues;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index 2f7f356..c2d8b21 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -69,6 +69,13 @@ public class ProfilePeriod {
   }
 
   /**
+   * When this period ended in milliseconds since the epoch.
+   */
+  public long getEndTimeMillis() {
+    return getStartTimeMillis() + getDurationMillis();
+  }
+
+  /**
    * Returns the next ProfilePeriod in time.
    */
   public ProfilePeriod next() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index bb1baf6..88ec806 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -49,7 +49,7 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
   public ColumnList columns(ProfileMeasurement measurement) {
 
     ColumnList cols = new ColumnList();
-    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getValue()));
+    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getProfileValue()));
 
     return cols;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
index 1434353..794fde4 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
@@ -90,7 +90,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate that x = 100, y = 200
-    assertEquals(100 + 200, (int) convert(m.getValue(), Integer.class));
+    assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -111,7 +111,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate that x = 0 and y = 0 as no initialization occurred
-    assertEquals(0, (int) convert(m.getValue(), Integer.class));
+    assertEquals(0, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -153,7 +153,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate that x=0, y=0 then x+=1, y+=2 for each message
-    assertEquals(count*1 + count*2, (int) convert(m.getValue(), Integer.class));
+    assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -185,7 +185,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate
-    assertEquals(100, (int) convert(m.getValue(), Integer.class));
+    assertEquals(100, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -260,8 +260,8 @@ public class ProfileBuilderTest {
 
     // validate
     assertEquals(2, m.getGroups().size());
-    assertEquals(100, (int) convert(m.getGroups().get(0), Integer.class));
-    assertEquals(200, (int) convert(m.getGroups().get(1), Integer.class));
+    assertEquals(100, m.getGroups().get(0));
+    assertEquals(200, m.getGroups().get(1));
   }
 
   /**
@@ -304,7 +304,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate
-    assertEquals(33, (int) convert(m.getValue(), Integer.class));
+    assertEquals(33, m.getProfileValue());
   }
 
   /**
@@ -347,7 +347,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate
-    assertEquals(3, (int) convert(m.getValue(), Integer.class));
+    assertEquals(3, m.getProfileValue());
   }
   /**
    * {
@@ -381,4 +381,81 @@ public class ProfileBuilderTest {
     assertEquals(entity, m.getEntity());
   }
 
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *      "x": "100"
+   *   },
+   *   "result": {
+   *      "profile": "x"
+   *   }
+   * }
+   */
+  @Multiline
+  private String testResultWithProfileExpression;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResultWithProfileExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(100, m.getProfileValue());
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *      "x": "100"
+   *   },
+   *   "result": {
+   *      "profile": "x",
+   *      "triage": {
+   *        "zero": "x - 100",
+   *        "hundred": "x"
+   *      }
+   *   }
+   * }
+   */
+  @Multiline
+  private String testResultWithTriageExpression;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResultWithTriageExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(0, m.getTriageValues().get("zero"));
+    assertEquals(100, m.getTriageValues().get("hundred"));
+    assertEquals(100, m.getProfileValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
index 5d7d121..57edea0 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
@@ -226,7 +226,7 @@ public class SaltyRowKeyBuilderTest {
             .withProfileName("profile")
             .withEntity("entity")
             .withPeriod(oldest, periodDuration, periodUnits)
-            .withValue(22);
+            .withProfileValue(22);
 
     // generate a list of expected keys
     List<byte[]> expectedKeys = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index 9768c07..b4fc104 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -80,16 +80,16 @@ The Profiler specification requires a JSON-formatted set of elements, many of wh
 The specification for the Profiler topology is stored in Zookeeper at  `/metron/topology/profiler`.  These properties also exist in the local filesystem at `$METRON_HOME/config/zookeeper/profiler.json`. 
 The values can be changed on disk and then uploaded to Zookeeper using `$METRON_HOME/bin/zk_load_configs.sh`.
 
-| Name                  | Required | Description                                                                             |
-| --------------------- | -------- | --------------------------------------------------------------------------------------- |
-| [profile](#profile)   | Required | Unique name identifying the profile.                                                    |
-| [foreach](#foreach)   | Required | A separate profile is maintained "for each" of these.                                   |
-| [onlyif](#onlyif)     | Optional | Boolean expression that determines if a message should be applied to the profile.       |
-| [groupBy](#groupby)   | Optional | One or more Stellar expressions used to group the profile measurements when persisted.  |
-| [init](#init)         | Optional | One or more expressions executed at the start of a window period.                       |
-| [update](#update)     | Required | One or more expressions executed when a message is applied to the profile.              |
-| [result](#result)     | Required | A Stellar expression that is executed when the window period expires.                   |
-| [expires](#expires)   | Optional | Profile data is purged after this period of time, specified in milliseconds.            |
+| Name 	                        |               | Description 	
+|---	                        |---	        |---
+| [profile](#profile)           | Required   	| Unique name identifying the profile. 
+| [foreach](#foreach)           | Required  	| A separate profile is maintained "for each" of these. 
+| [onlyif](#onlyif)  	        | Optional  	| Boolean expression that determines if a message should be applied to the profile.
+| [groupBy](#groupby)           | Optional      | One or more Stellar expressions used to group the profile measurements when persisted.
+| [init](#init)  	            | Optional  	| One or more expressions executed at the start of a window period.
+| [update](#update)  	        | Required  	| One or more expressions executed when a message is applied to the profile.
+| [result](#result)   	        | Required  	| Stellar expressions that are executed when the window period expires.
+| [expires](#expires)           | Optional      | Profile data is purged after this period of time, specified in milliseconds.
 
 ### `profile` 
 
@@ -153,7 +153,44 @@ One or more expressions executed when a message is applied to the profile.  A ma
 
 *Required*
 
-A Stellar expression that is executed when the window period expires.  The expression is expected to summarize the messages that were applied to the profile over the window period, using the state accumulated by the updates.  The result will typically be a single numeric value, but it may be any serializable object, as shown in Example 4 below.  	   
+Stellar expressions that are executed when the window period expires.  The expressions are expected to summarize the messages that were applied to the profile over the window period.  In the most basic form a single result is persisted for later retrieval.
+```
+"result": "var1 + var2"
+```
+
+For more advanced use cases, a profile can generate two types of results.  A profile can define one or both of these result types at the same time. 
+* `profile`:  A required expression that defines a value that is persisted for later retrieval.
+* `triage`: An optional expression that defines values that are accessible within the Threat Triage process.
+
+**profile**
+
+A required Stellar expression that results in a value that is persisted in the profile store for later retrieval.  The expression can result in any object that is Kryo serializable.  These values can be retrieved for later use with the [Profiler Client](../metron-profiler-client). 
+```
+"result": {
+    "profile": "2 + 2"
+}
+```
+
+An alternative, simplified form is also acceptable.
+```
+"result": "2 + 2"
+
+```
+
+**triage**
+
+An optional map of one or more Stellar expressions. The value of each expression is made available to the Threat Triage process under the given name.  Each expression must result in a either a primitive type, like an integer, long, or short, or a String.  All other types will result in an error.
+
+In the following example, three values, the minimum, the maximum and the mean are appended to a message.  This message is consumed by Metron, like other sources of telemetry, and each of these values are accessible from within the Threat Triage process using the given field names; `min`, `max`, and `mean`.
+```
+"result": {
+    "triage": {
+        "min": "STATS_MIN(stats)",
+        "max": "STATS_MAX(stats)",
+        "mean": "STATS_MEAN(stats)"
+    }
+}
+```
 
 ### `expires`
 
@@ -166,21 +203,22 @@ A numeric value that defines how many days the profile data is retained.  After
 The Profiler runs as an independent Storm topology.  The configuration for the Profiler topology is stored in local filesystem at `$METRON_HOME/config/profiler.properties`. 
 The values can be changed on disk and then the Profiler topology must be restarted.
 
-| Setting                               | Description                                                                                                                                                                                                 |
-| ------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| profiler.workers                      | The number of worker processes to create for the topology.                                                                                                                                                  |
-| profiler.executors                    | The number of executors to spawn per component.                                                                                                                                                             |
-| profiler.input.topic                  | The name of the Kafka topic from which to consume data.                                                                                                                                                     |
-| profiler.period.duration              | The duration of each profile period.  This value should be defined along with `profiler.period.duration.units`.                                                                                             |
-| profiler.period.duration.units        | The units used to specify the `profiler.period.duration`.                                                                                                                                                   |
-| profiler.ttl                          | If a message has not been applied to a Profile in this period of time, the Profile will be forgotten and its resources will be cleaned up. This value should be defined along with `profiler.ttl.units`.    |
-| profiler.ttl.units                    | The units used to specify the `profiler.ttl`.                                                                                                                                                               |
-| profiler.hbase.salt.divisor           |  A salt is prepended to the row key to help prevent hotspotting.  This constant is used to generate the salt.  Ideally, this constant should be roughly equal to the number of nodes in the Hbase cluster.  |
-| profiler.hbase.table                  | The name of the HBase table that profiles are written to.                                                                                                                                                   |
-| profiler.hbase.column.family          | The column family used to store profiles.                                                                                                                                                                   |
-| profiler.hbase.batch                  | The number of puts that are written in a single batch.                                                                                                                                                      |
-| profiler.hbase.flush.interval.seconds | The maximum number of seconds between batch writes to HBase.                                                                                                                                                |
 
+| Setting   | Description   |
+|---        |---            |
+| profiler.workers | The number of worker processes to create for the topology.   |
+| profiler.executors | The number of executors to spawn per component.  |
+| profiler.input.topic | The name of the Kafka topic from which to consume data.  |
+| profiler.output.topic | The name of the Kafka topic to which profile data is written.  Only used with profiles that use the [`triage` result field](#result).  |
+| profiler.period.duration | The duration of each profile period.  This value should be defined along with `profiler.period.duration.units`.  |
+| profiler.period.duration.units | The units used to specify the `profiler.period.duration`. |
+| profiler.ttl | If a message has not been applied to a Profile in this period of time, the Profile will be forgotten and its resources will be cleaned up. This value should be defined along with `profiler.ttl.units`. |
+| profiler.ttl.units | The units used to specify the `profiler.ttl`. |
+| profiler.hbase.salt.divisor  |  A salt is prepended to the row key to help prevent hotspotting.  This constant is used to generate the salt.  Ideally, this constant should be roughly equal to the number of nodes in the Hbase cluster.  |
+| profiler.hbase.table | The name of the HBase table that profiles are written to.  |
+| profiler.hbase.column.family | The column family used to store profiles. |
+| profiler.hbase.batch | The number of puts that are written in a single batch.  |
+| profiler.hbase.flush.interval.seconds | The maximum number of seconds between batch writes to HBase. |
 
 After altering the configuration, start the Profiler.
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/pom.xml b/metron-analytics/metron-profiler/pom.xml
index 355c4b0..3c8baef 100644
--- a/metron-analytics/metron-profiler/pom.xml
+++ b/metron-analytics/metron-profiler/pom.xml
@@ -54,12 +54,27 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-writer</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-statistics</artifactId>
             <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -98,6 +113,11 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+            <version>3.0.3</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${global_hadoop_version}</version>
@@ -164,6 +184,10 @@
                     <artifactId>log4j-slf4j-impl</artifactId>
                     <groupId>org.apache.logging.log4j</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
             </exclusions>
             <scope>provided</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 1f4cc0d..91e4226 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -23,6 +23,7 @@
 profiler.workers=1
 profiler.executors=0
 profiler.input.topic=indexing
+profiler.output.topic=enrichments
 profiler.period.duration=15
 profiler.period.duration.units=MINUTES
 profiler.ttl=30
@@ -33,6 +34,7 @@ profiler.hbase.column.family=P
 profiler.hbase.batch=10
 profiler.hbase.flush.interval.seconds=30
 
+
 ##### Kafka #####
 
 kafka.zk=node1:2181

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 0b2a0db..0a26b73 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,10 +17,12 @@
 name: "profiler"
 
 config:
+
     topology.workers: ${profiler.workers}
     topology.acker.executors: ${profiler.executors}
 
 components:
+
     -   id: "rowKeyBuilder"
         className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder"
         properties:
@@ -61,8 +63,21 @@ components:
             - "indexing"
         configMethods:
             -   name: "from"
-                args:
-                    - "${kafka.start}"
+                args: ["${kafka.start}"]
+
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args: ["${profiler.output.topic}"]
+            -   name: "withZkQuorum"
+                args: ["${kafka.zk}"]
+
+    -   id: "kafkaDestinationHandler"
+        className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler"
+
+    -   id: "hbaseDestinationHandler"
+        className: "org.apache.metron.profiler.bolt.HBaseDestinationHandler"
 
 spouts:
 
@@ -85,8 +100,12 @@ bolts:
         configMethods:
             - name: "withPeriodDuration"
               args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
-            - name: "withTimeToLive"
+            - name: "withProfileTimeToLive"
               args: [${profiler.ttl}, "${profiler.ttl.units}"]
+            - name: "withDestinationHandler"
+              args: [ref: "kafkaDestinationHandler"]
+            - name: "withDestinationHandler"
+              args: [ref: "hbaseDestinationHandler"]
 
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.bolt.HBaseBolt"
@@ -101,6 +120,16 @@ bolts:
             - name: "withFlushIntervalSecs"
               args: [${profiler.hbase.flush.interval.seconds}]
 
+    -   id: "kafkaBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args: [ref: "kafkaWriter"]
+            -   name: "withMessageGetter"
+                args: ["NAMED"]
+
 streams:
 
     -   name: "spout -> splitter"
@@ -114,10 +143,18 @@ streams:
         to: "builderBolt"
         grouping:
             type: FIELDS
-            args: ["entity", "profile", "message"]
+            args: ["entity", "profile"]
 
     -   name: "builder -> hbase"
         from: "builderBolt"
         to: "hbaseBolt"
         grouping:
+            streamId: "hbase"
+            type: SHUFFLE
+
+    -   name: "builder -> kafka"
+        from: "builderBolt"
+        to: "kafkaBolt"
+        grouping:
+            streamId: "kafka"
             type: SHUFFLE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
new file mode 100644
index 0000000..2257784
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+
+/**
+ * This class handles the mechanics of emitting a profile measurement to a
+ * stream responsible for writing to a specific destination.
+ *
+ * The measurements produced by a profile can be written to one or more
+ * destinations; HBase, Kafka, etc.  Each of the destinations leverage a
+ * separate stream within the topology definition.
+ */
+public interface DestinationHandler {
+
+  /**
+   * Each destination leverages a unique stream.  This method defines
+   * the unique stream identifier.
+   *
+   * The stream identifier must also be declared within the topology
+   * definition.
+   */
+  String getStreamId();
+
+  /**
+   * Declares the output fields for the stream.
+   * @param declarer
+   */
+  void declareOutputFields(OutputFieldsDeclarer declarer);
+
+  /**
+   * Emit the measurement.
+   * @param measurement The measurement to emit.
+   * @param collector The output collector.
+   */
+  void emit(ProfileMeasurement measurement, OutputCollector collector);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
new file mode 100644
index 0000000..4fa5dc1
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
@@ -0,0 +1,58 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+
+/**
+ * Handles emitting a ProfileMeasurement to the stream which writes
+ * profile measurements to HBase.
+ */
+public class HBaseDestinationHandler implements DestinationHandler, Serializable {
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private String streamId = "hbase";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream(getStreamId(), new Fields("measurement"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+    collector.emit(getStreamId(), new Values(measurement));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
new file mode 100644
index 0000000..5d8c971
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
@@ -0,0 +1,110 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Handles emitting a ProfileMeasurement to the stream which writes
+ * profile measurements to Kafka.
+ */
+public class KafkaDestinationHandler implements DestinationHandler, Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(KafkaDestinationHandler.class);
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private String streamId = "kafka";
+
+  /**
+   * The 'source.type' of messages originating from the Profiler.
+   */
+  private String sourceType = "profiler";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    // the kafka writer expects a field named 'message'
+    declarer.declareStream(getStreamId(), new Fields("message"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+
+    JSONObject message = new JSONObject();
+    message.put("profile", measurement.getDefinition().getProfile());
+    message.put("entity", measurement.getEntity());
+    message.put("period", measurement.getPeriod().getPeriod());
+    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
+    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+    message.put("timestamp", System.currentTimeMillis());
+    message.put("source.type", sourceType);
+    message.put("is_alert", "true");
+
+    // append each of the triage values to the message
+    measurement.getTriageValues().forEach((key, value) -> {
+
+      if(isValidType(value)) {
+        message.put(key, value);
+
+      } else {
+        LOG.error(String.format("triage expression has invalid type. expect primitive types only. skipping: profile=%s, entity=%s, expression=%s, type=%s",
+                measurement.getDefinition().getProfile(), measurement.getEntity(), key, ClassUtils.getShortClassName(value, "null")));
+      }
+    });
+
+    collector.emit(getStreamId(), new Values(message));
+  }
+
+  /**
+   * The result of a profile's triage expressions must be a string or primitive type.
+   *
+   * This ensures that the value can be easily serialized and appended to a message destined for Kafka.
+   *
+   * @param value The value of a triage expression.
+   * @return True, if the type of the value is valid.
+   */
+  private boolean isValidType(Object value) {
+    return value != null && (value instanceof String || ClassUtils.isPrimitiveOrWrapper(value.getClass()));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  public void setSourceType(String sourceType) {
+    this.sourceType = sourceType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index a22361d..695f7b7 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -32,15 +32,15 @@ import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.TupleUtils;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -70,9 +70,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    * If a message has not been applied to a Profile in this number of milliseconds,
    * the Profile will be forgotten and its resources will be cleaned up.
    *
-   * The TTL must be at least greater than the period duration.
+   * WARNING: The TTL must be at least greater than the period duration.
    */
-  private long timeToLiveMillis;
+  private long profileTimeToLiveMillis;
 
   /**
    * Maintains the state of a profile which is unique to a profile/entity pair.
@@ -85,10 +85,17 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private transient JSONParser parser;
 
   /**
+   * The measurements produced by a profile can be written to multiple destinations.  Each
+   * destination is handled by a separate `DestinationHandler`.
+   */
+  private List<DestinationHandler> destinationHandlers;
+
+  /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration data.
    */
   public ProfileBuilderBolt(String zookeeperUrl) {
     super(zookeeperUrl);
+    this.destinationHandlers = new ArrayList<>();
   }
 
   /**
@@ -107,34 +114,45 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     super.prepare(stormConf, context, collector);
 
-    if(timeToLiveMillis < periodDurationMillis) {
+    if(profileTimeToLiveMillis < periodDurationMillis) {
       throw new IllegalStateException(format(
               "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
-              timeToLiveMillis,
+              profileTimeToLiveMillis,
               periodDurationMillis));
     }
     this.collector = collector;
     this.parser = new JSONParser();
     this.profileCache = CacheBuilder
             .newBuilder()
-            .expireAfterAccess(timeToLiveMillis, TimeUnit.MILLISECONDS)
+            .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
             .build();
   }
 
-  /**
-   * The builder emits a single field, 'measurement', which contains a ProfileMeasurement. A
-   * ProfileMeasurement is emitted when a time window expires and a flush occurs.
-   */
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    // once the time window expires, a complete ProfileMeasurement is emitted
-    declarer.declare(new Fields("measurement", "profile"));
+    if(destinationHandlers.size() == 0) {
+      throw new IllegalStateException("At least one destination handler must be defined.");
+    }
+
+    // each destination will define its own stream
+    destinationHandlers.forEach(dest -> dest.declareOutputFields(declarer));
   }
 
+  /**
+   * Expect to receive either a tick tuple or a telemetry message that needs applied
+   * to a profile.
+   * @param input The tuple.
+   */
   @Override
   public void execute(Tuple input) {
     try {
-      doExecute(input);
+      if(TupleUtils.isTick(input)) {
+        handleTick();
+        profileCache.cleanUp();
+
+      } else {
+        handleMessage(input);
+      }
 
     } catch (Throwable e) {
       LOG.error(format("Unexpected failure: message='%s', tuple='%s'", e.getMessage(), input), e);
@@ -146,32 +164,28 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   }
 
   /**
-   * Update the execution environment based on data contained in the
-   * message.  If the tuple is a tick tuple, then flush the profile
-   * and reset the execution environment.
-   * @param input The tuple to execute.
+   * Handles a telemetry message
+   * @param input The tuple.
    */
-  private void doExecute(Tuple input) throws ExecutionException {
-
-    if(TupleUtils.isTick(input)) {
-
-      // when a 'tick' is received, flush the profile and emit the completed profile measurement
-      profileCache.asMap().forEach((key, profileBuilder) -> {
-        if(profileBuilder.isInitialized()) {
-          ProfileMeasurement measurement = profileBuilder.flush();
-          collector.emit(new Values(measurement, profileBuilder.getDefinition()));
-        }
-      });
+  private void handleMessage(Tuple input) throws ExecutionException {
+    JSONObject message = getField("message", input, JSONObject.class);
+    getBuilder(input).apply(message);
+  }
 
-      // cache maintenance
-      profileCache.cleanUp();
+  /**
+   * Handles a tick tuple.
+   */
+  private void handleTick() {
+    profileCache.asMap().forEach((key, profileBuilder) -> {
+      if(profileBuilder.isInitialized()) {
 
-    } else {
+        // flush the profile
+        ProfileMeasurement measurement = profileBuilder.flush();
 
-      // telemetry message provides additional context for 'init' and 'update' expressions
-      JSONObject message = getField("message", input, JSONObject.class);
-      getBuilder(input).apply(message);
-    }
+        // forward the measurement to each destination handler
+        destinationHandlers.forEach(handler -> handler.emit(measurement, collector));
+      }
+    });
   }
 
   /**
@@ -213,7 +227,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) {
     T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz);
     if(value == null) {
-      throw new IllegalStateException(format("invalid tuple received: missing field '%s'", fieldName));
+      throw new IllegalStateException(format("invalid tuple received: missing or invalid field '%s'", fieldName));
     }
 
     return value;
@@ -228,13 +242,17 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     return withPeriodDurationMillis(units.toMillis(duration));
   }
 
-  public ProfileBuilderBolt withTimeToLiveMillis(long timeToLiveMillis) {
-    this.timeToLiveMillis = timeToLiveMillis;
+  public ProfileBuilderBolt withProfileTimeToLiveMillis(long timeToLiveMillis) {
+    this.profileTimeToLiveMillis = timeToLiveMillis;
     return this;
   }
 
-  public ProfileBuilderBolt withTimeToLive(int duration, TimeUnit units) {
-    return withTimeToLiveMillis(units.toMillis(duration));
+  public ProfileBuilderBolt withProfileTimeToLive(int duration, TimeUnit units) {
+    return withProfileTimeToLiveMillis(units.toMillis(duration));
   }
 
+  public ProfileBuilderBolt withDestinationHandler(DestinationHandler handler) {
+    this.destinationHandlers.add(handler);
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index cdde001..5402ac8 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -93,7 +93,8 @@ public class ProfileHBaseMapper implements HBaseMapper {
   public Optional<Long> getTTL(Tuple tuple) {
     Optional<Long> result = Optional.empty();
 
-    ProfileConfig profileConfig = (ProfileConfig) tuple.getValueByField("profile");
+    ProfileMeasurement measurement = (ProfileMeasurement) tuple.getValueByField("measurement");
+    ProfileConfig profileConfig = measurement.getDefinition();
     if(profileConfig.getExpires() != null) {
       result = Optional.of(profileConfig.getExpires());
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index ae62699..0fb1fd2 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -134,8 +134,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     Map<String, Object> state = (Map<String, Object>)message;
 
     // is this message needed by this profile?
-    String onlyIf = profile.getOnlyif();
-    if (StringUtils.isBlank(onlyIf) || executor.execute(onlyIf, state, Boolean.class)) {
+    if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
 
       // what is the name of the entity in this message?
       String entity = executor.execute(profile.getForeach(), state, String.class);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
new file mode 100644
index 0000000..b003ce0
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
@@ -0,0 +1,11 @@
+{
+  "profiles": [
+    {
+      "profile": "example4",
+      "foreach": "ip_src_addr",
+      "onlyif": "protocol == 'HTTP'",
+      "update": { "s": "STATS_ADD(s, length)" },
+      "result": "s"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
deleted file mode 100644
index 8f24cea..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "example1",
-      "foreach": "ip_src_addr",
-      "init": {},
-      "update": {},
-      "result": "TO_INTEGER(10.0)"
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
new file mode 100644
index 0000000..c3f2584
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
@@ -0,0 +1,203 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests the KafkaDestinationHandler.
+ */
+public class KafkaDestinationHandlerTest {
+
+  /**
+   * {
+   *   "profile": "profile-one-destination",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileDefinition;
+
+  private KafkaDestinationHandler handler;
+  private ProfileConfig profile;
+  private OutputCollector collector;
+
+  @Before
+  public void setup() throws Exception {
+    handler = new KafkaDestinationHandler();
+    profile = createDefinition(profileDefinition);
+    collector = Mockito.mock(OutputCollector.class);
+  }
+
+  /**
+   * The handler must serialize the ProfileMeasurement into a JSONObject.
+   */
+  @Test
+  public void testSerialization() throws Exception {
+
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+
+    // expect a JSONObject
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+
+    // validate the json
+    JSONObject actual = (JSONObject) values.get(0);
+    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+    assertEquals(measurement.getEntity(), actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+    assertNotNull(actual.get("timestamp"));
+    assertEquals("profiler", actual.get("source.type"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testInvalidType() throws Exception {
+
+    // create one invalid expression and one valid expression
+    Map<String, Object> triageValues = ImmutableMap.of(
+            "invalid", new OnlineStatisticsProvider(),
+            "valid", 4);
+
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(triageValues)
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+
+    // only the triage expression value itself should have been skipped, all others should be there
+    JSONObject actual = (JSONObject) values.get(0);
+    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+    assertEquals(measurement.getEntity(), actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+    assertNotNull(actual.get("timestamp"));
+    assertEquals("profiler", actual.get("source.type"));
+
+    // the invalid expression should be skipped due to invalid type
+    assertFalse(actual.containsKey("invalid"));
+
+    // but the valid expression should still be there
+    assertEquals(triageValues.get("valid"), actual.get("valid"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testIntegerIsValidType() throws Exception {
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", 123))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    JSONObject actual = (JSONObject) values.get(0);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testStringIsValidType() throws Exception {
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "value"))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    JSONObject actual = (JSONObject) values.get(0);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 8d610bd..935fe57 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -28,6 +28,7 @@ import org.apache.metron.profiler.ProfileBuilder;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.storm.Constants;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
@@ -45,7 +46,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -121,9 +124,13 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     bolt.setCuratorFramework(client);
     bolt.setTreeCache(cache);
     bolt.withPeriodDuration(10, TimeUnit.MINUTES);
-    bolt.withTimeToLive(30, TimeUnit.MINUTES);
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+    bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
+
+    // define the valid destinations for the profiler
+    bolt.withDestinationHandler(new HBaseDestinationHandler());
+    bolt.withDestinationHandler(new KafkaDestinationHandler());
 
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
     return bolt;
   }
 
@@ -242,11 +249,11 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
   }
 
   /**
-   * A ProfileMeasurement should be emitted for each profile/entity currently being tracked
-   * by the bolt.
+   * A ProfileMeasurement is build for each profile/entity pair.  A measurement for each profile/entity
+   * pair should be emitted.
    */
   @Test
-  public void testEmitMeasurementsOnFlush() throws Exception {
+  public void testEmitMeasurements() throws Exception {
 
     // setup
     ProfileBuilderBolt bolt = createBolt();
@@ -267,33 +274,64 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
 
     // capture the ProfileMeasurement that should be emitted
     ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(outputCollector, times(2)).emit(arg.capture());
 
-    // validate
-    for(Values value : arg.getAllValues()) {
+    // validate emitted measurements for hbase
+    verify(outputCollector, atLeastOnce()).emit(eq("hbase"), arg.capture());
+    for (Values value : arg.getAllValues()) {
 
       ProfileMeasurement measurement = (ProfileMeasurement) value.get(0);
-      ProfileConfig definition = (ProfileConfig) value.get(1);
+      ProfileConfig definition = measurement.getDefinition();
 
-      if(StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) {
+      if (StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) {
 
         // validate measurement emitted for profile two
         assertEquals(definitionTwo, definition);
         assertEquals(entity, measurement.getEntity());
         assertEquals(definitionTwo.getProfile(), measurement.getProfileName());
-        assertEquals(1, (int) convert(measurement.getValue(), Integer.class));
+        assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
 
-      } else if(StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) {
+      } else if (StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) {
 
         // validate measurement emitted for profile one
         assertEquals(definitionOne, definition);
         assertEquals(entity, measurement.getEntity());
         assertEquals(definitionOne.getProfile(), measurement.getProfileName());
-        assertEquals(1, (int) convert(measurement.getValue(), Integer.class));
+        assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
 
       } else {
         fail();
       }
     }
   }
+
+  /**
+   * A ProfileMeasurement is build for each profile/entity pair.  The measurement should be emitted to each
+   * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
+   */
+  @Test
+  public void testDestinationHandlers() throws Exception {
+
+    // setup
+    ProfileBuilderBolt bolt = createBolt();
+    ProfileConfig definitionOne = createDefinition(profileOne);
+
+    // apply the message to the first profile
+    final String entity = (String) messageOne.get("ip_src_addr");
+    Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
+    bolt.execute(tupleOne);
+
+    // trigger a flush of the profile
+    bolt.execute(mockTickTuple());
+
+    // capture the values that should be emitted
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+
+    // validate measurements emitted to HBase
+    verify(outputCollector, times(1)).emit(eq("hbase"), arg.capture());
+    assertTrue(arg.getValue().get(0) instanceof ProfileMeasurement);
+
+    // validate measurements emitted to Kafka
+    verify(outputCollector, times(1)).emit(eq("kafka"), arg.capture());
+    assertTrue(arg.getValue().get(0) instanceof JSONObject);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
index 7e0606e..bdeef0b 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler.bolt;
 
+import org.apache.metron.common.configuration.profiler.ProfileResult;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.profiler.ProfileMeasurement;
@@ -59,18 +60,18 @@ public class ProfileHBaseMapperTest {
     mapper = new ProfileHBaseMapper();
     mapper.setRowKeyBuilder(rowKeyBuilder);
 
+    profile = new ProfileConfig("profile", "ip_src_addr", new ProfileResult("2 + 2"));
+
     measurement = new ProfileMeasurement()
             .withProfileName("profile")
             .withEntity("entity")
             .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withValue(22);
-
-    profile = new ProfileConfig();
+            .withProfileValue(22)
+            .withDefinition(profile);
 
     // the tuple will contain the original message
     tuple = mock(Tuple.class);
     when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement);
-    when(tuple.getValueByField(eq("profile"))).thenReturn(profile);
   }
 
   /**
@@ -91,16 +92,8 @@ public class ProfileHBaseMapperTest {
    */
   @Test
   public void testExpiresUndefined() throws Exception {
-
-    // do not set the TTL on the profile
-    ProfileConfig profileNoTTL = new ProfileConfig();
-
-    // the tuple references the profile with the missing TTL
-    Tuple tupleNoTTL = mock(Tuple.class);
-    when(tupleNoTTL.getValueByField(eq("profile"))).thenReturn(profileNoTTL);
-
     // the TTL should not be defined
-    Optional<Long> actual = mapper.getTTL(tupleNoTTL);
+    Optional<Long> actual = mapper.getTTL(tuple);
     Assert.assertFalse(actual.isPresent());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index 0ac1c33..0879835 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -63,7 +63,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "onlyif": "protocol == 'HTTP'",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }
@@ -80,7 +80,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "onlyif": "false",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }
@@ -96,7 +96,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "foreach": "ip_src_addr",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }
@@ -113,7 +113,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "onlyif": "NOT-VALID",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index e09f7f1..357908d 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -21,7 +21,6 @@
 package org.apache.metron.profiler.integration;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -39,6 +38,8 @@ import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.metron.statistics.StatisticsProvider;
 import org.apache.metron.test.mock.MockHTable;
 import org.junit.After;
 import org.junit.Assert;
@@ -111,6 +112,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static final String tableName = "profiler";
   private static final String columnFamily = "P";
   private static final double epsilon = 0.001;
+  private static final String inputTopic = Constants.INDEXING_TOPIC;
+  private static final String outputTopic = "profiles";
 
   /**
    * A TableProvider that allows us to mock HBase.
@@ -135,7 +138,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -160,7 +163,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
     final int expected = 2;
@@ -193,7 +196,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -208,29 +211,29 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     ));
   }
 
+  /**
+   * Tests the fourth example contained within the README.
+   */
   @Test
-  public void testWriteInteger() throws Exception {
+  public void testExample4() throws Exception {
 
-    setup(TEST_RESOURCES + "/config/zookeeper/write-integer");
+    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
-
-    // expect 3 values written by the profile; one for each host
-    final int expected = 3;
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected,
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
             timeout(seconds(90)));
 
-    // verify - the profile sees messages from 3 hosts; 10.0.0.[1-3]
-    List<Integer> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Integer.class);
-    Assert.assertEquals(3, actuals.size());
+    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
+    byte[] column = columnBuilder.getColumnQualifier("value");
+    List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class);
 
-    // verify - the profile writes 10 as an integer
+    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
     Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val, 10.0, epsilon)
+            MathUtils.equals(val.getMean(), 20.0, epsilon)
     ));
   }
 
@@ -241,7 +244,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -290,7 +293,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name());
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
-      setProperty("profiler.input.topic", Constants.INDEXING_TOPIC);
+      setProperty("profiler.input.topic", inputTopic);
+      setProperty("profiler.output.topic", outputTopic);
       setProperty("profiler.period.duration", "20");
       setProperty("profiler.period.duration.units", "SECONDS");
       setProperty("profiler.ttl", "30");
@@ -310,8 +314,9 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     zkComponent = getZKServerComponent(topologyProperties);
 
     // create the input topic
-    kafkaComponent = getKafkaComponent(topologyProperties,
-            Arrays.asList(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1)));
+    kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
+            new KafkaComponent.Topic(inputTopic, 1),
+            new KafkaComponent.Topic(outputTopic, 1)));
 
     // upload profiler configuration to zookeeper
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()