You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2017/03/02 22:02:12 UTC

[1/2] incubator-metron git commit: METRON-701 Triage Metrics Produced by the Profiler (nickwallen) closes apache/incubator-metron#449

Repository: incubator-metron
Updated Branches:
  refs/heads/master e66284993 -> 818b0b17b


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index d8f8acf..0ac1314 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -17,12 +17,18 @@
  */
 package org.apache.metron.common.configuration.profiler;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
- * The user defined configuration values required to generate a Profile.
+ * The definition of a single Profile.
  */
 public class ProfileConfig implements Serializable {
 
@@ -46,7 +52,7 @@ public class ProfileConfig implements Serializable {
    * is only applied to a profile if this condition is true. This allows a profile
    * to filter the messages that it receives.
    */
-  private String onlyif;
+  private String onlyif = "true";
 
   /**
    * A set of expressions that is executed at the start of a window period.  A map is
@@ -71,12 +77,11 @@ public class ProfileConfig implements Serializable {
   private List<String> groupBy = new ArrayList<>();
 
   /**
-   * A Stellar expression that is executed when the window period expires.  The
-   * expression is expected to in some way summarize the messages that were applied
-   * to the profile over the window period.  The expression must result in a numeric
-   * value such as a Double, Long, Float, Short, or Integer.
+   * Stellar expression(s) that are executed when the window period expires.  The
+   * expression(s) are expected to in some way summarize the messages that were applied
+   * to the profile over the window period.
    */
-  private String result;
+  private ProfileResult result;
 
   /**
    * How long the data created by this Profile will be retained.  After this period of time the
@@ -84,6 +89,23 @@ public class ProfileConfig implements Serializable {
    */
   private Long expires;
 
+  /**
+   * A profile definition requires at the very least the profile name, the foreach, and result
+   * expressions.
+   * @param profile The name of the profile.
+   * @param foreach The foreach expression of the profile.
+   * @param result The result expression of the profile.
+   */
+  public ProfileConfig(
+          @JsonProperty(value = "profile", required = true) String profile,
+          @JsonProperty(value = "foreach", required = true) String foreach,
+          @JsonProperty(value = "result",  required = true) ProfileResult result) {
+
+    this.profile = profile;
+    this.foreach = foreach;
+    this.result = result;
+  }
+
   public String getProfile() {
     return profile;
   }
@@ -132,11 +154,11 @@ public class ProfileConfig implements Serializable {
     this.groupBy = groupBy;
   }
 
-  public String getResult() {
+  public ProfileResult getResult() {
     return result;
   }
 
-  public void setResult(String result) {
+  public void setResult(ProfileResult result) {
     this.result = result;
   }
 
@@ -149,20 +171,6 @@ public class ProfileConfig implements Serializable {
   }
 
   @Override
-  public String toString() {
-    return "ProfileConfig{" +
-            "profile='" + profile + '\'' +
-            ", foreach='" + foreach + '\'' +
-            ", onlyif='" + onlyif + '\'' +
-            ", init=" + init +
-            ", update=" + update +
-            ", groupBy=" + groupBy +
-            ", result='" + result + '\'' +
-            ", expires=" + expires +
-            '}';
-  }
-
-  @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
@@ -177,7 +185,6 @@ public class ProfileConfig implements Serializable {
     if (groupBy != null ? !groupBy.equals(that.groupBy) : that.groupBy != null) return false;
     if (result != null ? !result.equals(that.result) : that.result != null) return false;
     return expires != null ? expires.equals(that.expires) : that.expires == null;
-
   }
 
   @Override
@@ -192,4 +199,18 @@ public class ProfileConfig implements Serializable {
     result1 = 31 * result1 + (expires != null ? expires.hashCode() : 0);
     return result1;
   }
+
+  @Override
+  public String toString() {
+    return "ProfileConfig{" +
+            "profile='" + profile + '\'' +
+            ", foreach='" + foreach + '\'' +
+            ", onlyif='" + onlyif + '\'' +
+            ", init=" + init +
+            ", update=" + update +
+            ", groupBy=" + groupBy +
+            ", result=" + result +
+            ", expires=" + expires +
+            '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
new file mode 100644
index 0000000..9a42426
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration.profiler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Defines the 'result' field of a Profile definition.
+ */
+public class ProfileResult {
+
+  /**
+   * A Stellar expression that is executed to produce
+   * a measurement that is persisted in the profile store.
+   */
+  @JsonProperty("profile")
+  private ProfileResultExpressions profileExpressions;
+
+  /**
+   * A set of named Stellar expressions that are executed
+   * to produce a measurement that can be used for threat
+   * triage.
+   */
+  @JsonProperty("triage")
+  private ProfileTriageExpressions triageExpressions;
+
+  @JsonCreator
+  public ProfileResult(
+          @JsonProperty(value = "profile", required = true) ProfileResultExpressions profileExpressions,
+          @JsonProperty(value = "triage") ProfileTriageExpressions triageExpressions) {
+    this.profileExpressions = profileExpressions;
+    this.triageExpressions = triageExpressions != null ? triageExpressions : new ProfileTriageExpressions();
+  }
+
+  /**
+   * Allows a single result expression to be interpreted as a 'profile' expression.
+   *
+   * The profile definition
+   *    <pre>{@code {..., "result": "2 + 2" }}</pre>
+   * is equivalent to
+   *    <pre>{@code {..., "result": { "profile": "2 + 2" }}}</pre>
+   *
+   * @param expression The result expression.
+   */
+  public ProfileResult(String expression) {
+    this.profileExpressions = new ProfileResultExpressions(expression);
+    this.triageExpressions = new ProfileTriageExpressions();
+  }
+
+  public ProfileResultExpressions getProfileExpressions() {
+    return profileExpressions;
+  }
+
+  public void setProfileExpressions(ProfileResultExpressions profileExpressions) {
+    this.profileExpressions = profileExpressions;
+  }
+
+  public ProfileTriageExpressions getTriageExpressions() {
+    return triageExpressions;
+  }
+
+  public void setTriageExpressions(ProfileTriageExpressions triageExpressions) {
+    this.triageExpressions = triageExpressions;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ProfileResult that = (ProfileResult) o;
+    if (profileExpressions != null ? !profileExpressions.equals(that.profileExpressions) : that.profileExpressions != null)
+      return false;
+    return triageExpressions != null ? triageExpressions.equals(that.triageExpressions) : that.triageExpressions == null;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = profileExpressions != null ? profileExpressions.hashCode() : 0;
+    result = 31 * result + (triageExpressions != null ? triageExpressions.hashCode() : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
new file mode 100644
index 0000000..1bca716
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration.profiler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * A Stellar expression that is executed to produce a single
+ * measurement that is persisted within the profile store.
+ */
+public class ProfileResultExpressions {
+
+  @JsonIgnore
+  private String expression;
+
+  @JsonCreator
+  public ProfileResultExpressions(String expression) {
+    this.expression = expression;
+  }
+
+  public String getExpression() {
+    return expression;
+  }
+
+  public void setExpression(String expression) {
+    this.expression = expression;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    ProfileResultExpressions that = (ProfileResultExpressions) o;
+    return expression != null ? expression.equals(that.expression) : that.expression == null;
+  }
+
+  @Override
+  public int hashCode() {
+    return expression != null ? expression.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
new file mode 100644
index 0000000..da74f64
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.configuration.profiler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A set of Stellar expressions that are executed to produce a single
+ * measurement that can be interrogated by the threat triage process.
+ *
+ * The result of evaluating each expression are made available, keyed
+ * by the given name, to the threat triage process.
+ */
+public class ProfileTriageExpressions {
+
+  /**
+   * A set of named Stellar expressions.  The name of the expression
+   * serves as the key and the value is the expression itself.
+   *
+   * Evaluating the expression(s) must result in a basic data type
+   * or map of basic data types that can be serialized.
+   */
+  @JsonIgnore
+  private Map<String, String> expressions;
+
+  @JsonCreator
+  public ProfileTriageExpressions(Map<String, String> expressions) {
+    this.expressions = expressions;
+  }
+
+  @JsonCreator
+  public ProfileTriageExpressions() {
+    this.expressions = new HashMap<>();
+  }
+
+  /**
+   * Returns the expression associated with a given name.
+   * @param name The name of the expression.
+   * @return A Stellar expression.
+   */
+  public String getExpression(String name) {
+    return expressions.get(name);
+  }
+
+  public Map<String, String> getExpressions() {
+    return expressions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index fcc03fd..cd651bd 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * The user defined configuration values required for the Profiler.
+ * The definition for entire Profiler, which may contain many Profile definitions.
  */
 public class ProfilerConfig implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
index 19b6a5a..c098787 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java
@@ -26,7 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 
 /**
- * Created by nallen on 7/28/16.
+ * Used to manage configurations for the Profiler.
  */
 public class ProfilerConfigurations extends Configurations {
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
index 856d18e..c61efc5 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
@@ -22,37 +22,20 @@ import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
 
 import java.io.*;
 
 public enum JSONUtils {
   INSTANCE;
-  private static ThreadLocal<ObjectMapper> _mapper = new ThreadLocal<ObjectMapper>() {
-    /**
-     * Returns the current thread's "initial value" for this
-     * thread-local variable.  This method will be invoked the first
-     * time a thread accesses the variable with the {@link #get}
-     * method, unless the thread previously invoked the {@link #set}
-     * method, in which case the {@code initialValue} method will not
-     * be invoked for the thread.  Normally, this method is invoked at
-     * most once per thread, but it may be invoked again in case of
-     * subsequent invocations of {@link #remove} followed by {@link #get}.
-     * <p>
-     * <p>This implementation simply returns {@code null}; if the
-     * programmer desires thread-local variables to have an initial
-     * value other than {@code null}, {@code ThreadLocal} must be
-     * subclassed, and this method overridden.  Typically, an
-     * anonymous inner class will be used.
-     *
-     * @return the initial value for this thread-local
-     */
-    @Override
-    protected ObjectMapper initialValue() {
-      ObjectMapper ret = new ObjectMapper();
-      ret.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-      return ret;
-    }
-  };
+
+  private static ThreadLocal<JSONParser> _parser = ThreadLocal.withInitial(() ->
+          new JSONParser());
+
+  private static ThreadLocal<ObjectMapper> _mapper = ThreadLocal.withInitial(() ->
+          new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL));
 
   public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
     return _mapper.get().readValue(is, ref);
@@ -93,4 +76,11 @@ public enum JSONUtils {
   public byte[] toJSON(Object config) throws JsonProcessingException {
     return _mapper.get().writeValueAsBytes(config);
   }
+
+  /**
+   * Transforms a bean (aka POJO) to a JSONObject.
+   */
+  public JSONObject toJSONObject(Object o) throws JsonProcessingException, ParseException {
+    return (JSONObject) _parser.get().parse(toJSON(o, false));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
new file mode 100644
index 0000000..a0e115d
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
@@ -0,0 +1,207 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.configuration.profiler;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Ensures that Profile definitions have the expected defaults
+ * and can be (de)serialized to and from JSON.
+ */
+public class ProfileConfigTest {
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": "2 + 2"
+   * }
+   */
+  @Multiline
+  private String onlyIfDefault;
+
+  /**
+   * The 'onlyif' field should default to 'true' when it is not specified.
+   */
+  @Test
+  public void testOnlyIfDefault() throws IOException {
+    ProfileConfig profile = JSONUtils.INSTANCE.load(onlyIfDefault, ProfileConfig.class);
+    assertEquals("true", profile.getOnlyif());
+  }
+
+  /**
+   * {
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": "2 + 2"
+   * }
+   */
+  @Multiline
+  private String nameMissing;
+
+  /**
+   * The 'name' of the profile must be defined.
+   */
+  @Test(expected = JsonMappingException.class)
+  public void testNameMissing() throws IOException {
+    JSONUtils.INSTANCE.load(nameMissing, ProfileConfig.class);
+  }
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "update": {},
+   *    "result": "2 + 2"
+   * }
+   */
+  @Multiline
+  private String foreachMissing;
+
+  /**
+   * The 'foreach' field must be defined.
+   */
+  @Test(expected = JsonMappingException.class)
+  public void testForeachMissing() throws IOException {
+    JSONUtils.INSTANCE.load(foreachMissing, ProfileConfig.class);
+  }
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {}
+   * }
+   */
+  @Multiline
+  private String resultMissing;
+
+  /**
+   * The 'result' field must be defined.
+   */
+  @Test(expected = JsonMappingException.class)
+  public void testResultMissing() throws IOException {
+    JSONUtils.INSTANCE.load(resultMissing, ProfileConfig.class);
+  }
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": {}
+   * }
+   */
+  @Multiline
+  private String resultMissingProfileExpression;
+
+  /**
+   * The 'result' field must contain the 'profile' expression used to store the profile measurement.
+   */
+  @Test(expected = JsonMappingException.class)
+  public void testResultMissingProfileExpression() throws IOException {
+    JSONUtils.INSTANCE.load(resultMissingProfileExpression, ProfileConfig.class);
+  }
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": "2 + 2"
+   * }
+   */
+  @Multiline
+  private String resultWithExpression;
+
+  /**
+   * If the 'result' field has only a single expression, it should be treated as
+   * the 'profile' expression used to store the profile measurement.
+   */
+  @Test
+  public void testResultWithExpression() throws IOException {
+    ProfileConfig profile = JSONUtils.INSTANCE.load(resultWithExpression, ProfileConfig.class);
+    assertEquals("2 + 2", profile.getResult().getProfileExpressions().getExpression());
+
+    // no triage expressions expected
+    assertEquals(0, profile.getResult().getTriageExpressions().getExpressions().size());
+  }
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": {
+   *      "profile": "2 + 2"
+   *    }
+   * }
+   */
+  @Multiline
+  private String resultWithProfileOnly;
+
+  /**
+   * The result's 'triage' field is optional.
+   */
+  @Test
+  public void testResultWithProfileOnly() throws IOException {
+    ProfileConfig profile = JSONUtils.INSTANCE.load(resultWithProfileOnly, ProfileConfig.class);
+    assertEquals("2 + 2", profile.getResult().getProfileExpressions().getExpression());
+
+    // no triage expressions expected
+    assertEquals(0, profile.getResult().getTriageExpressions().getExpressions().size());
+  }
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": {
+   *      "profile": "2 + 2",
+   *      "triage": {
+   *        "eight": "4 + 4",
+   *        "sixteen": "8 + 8"
+   *      }
+   *    }
+   * }
+   */
+  @Multiline
+  private String resultWithTriage;
+
+  /**
+   * The result's 'triage' field can contain many named expressions.
+   */
+  @Test
+  public void testResultWithTriage() throws IOException {
+    ProfileConfig profile = JSONUtils.INSTANCE.load(resultWithTriage, ProfileConfig.class);
+
+    assertEquals("4 + 4", profile.getResult().getTriageExpressions().getExpression("eight"));
+    assertEquals("8 + 8", profile.getResult().getTriageExpressions().getExpression("sixteen"));
+  }
+}


[2/2] incubator-metron git commit: METRON-701 Triage Metrics Produced by the Profiler (nickwallen) closes apache/incubator-metron#449

Posted by ni...@apache.org.
METRON-701 Triage Metrics Produced by the Profiler (nickwallen) closes apache/incubator-metron#449


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/818b0b17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/818b0b17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/818b0b17

Branch: refs/heads/master
Commit: 818b0b17b131d875259178e61e59d32d02ae792a
Parents: e662849
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Mar 2 17:01:41 2017 -0500
Committer: Nick Allen <ni...@nickallen.org>
Committed: Thu Mar 2 17:01:41 2017 -0500

----------------------------------------------------------------------
 dependencies_with_url.csv                       |   1 +
 .../metron/profiler/client/ProfileWriter.java   |   4 +-
 .../apache/metron/profiler/ProfileBuilder.java  |  34 ++-
 .../metron/profiler/ProfileMeasurement.java     | 109 ++++++----
 .../apache/metron/profiler/ProfilePeriod.java   |   7 +
 .../profiler/hbase/ValueOnlyColumnBuilder.java  |   2 +-
 .../metron/profiler/ProfileBuilderTest.java     |  93 ++++++++-
 .../profiler/hbase/SaltyRowKeyBuilderTest.java  |   2 +-
 metron-analytics/metron-profiler/README.md      |  88 +++++---
 metron-analytics/metron-profiler/pom.xml        |  24 +++
 .../src/main/config/profiler.properties         |   2 +
 .../src/main/flux/profiler/remote.yaml          |  45 +++-
 .../profiler/bolt/DestinationHandler.java       |  56 +++++
 .../profiler/bolt/HBaseDestinationHandler.java  |  58 ++++++
 .../profiler/bolt/KafkaDestinationHandler.java  | 110 ++++++++++
 .../profiler/bolt/ProfileBuilderBolt.java       | 100 +++++----
 .../profiler/bolt/ProfileHBaseMapper.java       |   3 +-
 .../profiler/bolt/ProfileSplitterBolt.java      |   3 +-
 .../zookeeper/readme-example-4/profiler.json    |  11 +
 .../zookeeper/write-integer/profiler.json       |  11 -
 .../bolt/KafkaDestinationHandlerTest.java       | 203 ++++++++++++++++++
 .../profiler/bolt/ProfileBuilderBoltTest.java   |  64 ++++--
 .../profiler/bolt/ProfileHBaseMapperTest.java   |  19 +-
 .../profiler/bolt/ProfileSplitterBoltTest.java  |   8 +-
 .../integration/ProfilerIntegrationTest.java    |  45 ++--
 .../configuration/profiler/ProfileConfig.java   |  71 ++++---
 .../configuration/profiler/ProfileResult.java   |  99 +++++++++
 .../profiler/ProfileResultExpressions.java      |  57 +++++
 .../profiler/ProfileTriageExpressions.java      |  67 ++++++
 .../configuration/profiler/ProfilerConfig.java  |   2 +-
 .../profiler/ProfilerConfigurations.java        |   2 +-
 .../apache/metron/common/utils/JSONUtils.java   |  42 ++--
 .../profiler/ProfileConfigTest.java             | 207 +++++++++++++++++++
 33 files changed, 1404 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 1d428c3..819ab84 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -12,6 +12,7 @@ org.fusesource.jansi:jansi:jar:1.11:compile,Apache v2,https://github.com/fusesou
 de.javakaffee:kryo-serializers:jar:0.38:compile,Apache v2,https://github.com/magro/kryo-serializers
 com.tdunning:t-digest:jar:3.1:compile,Apache v2,https://github.com/tdunning/t-digest
 com.esotericsoftware:kryo:jar:3.0.3:compile,New BSD License,http://code.google.com/p/kryo
+com.esotericsoftware:kryo-shaded:jar:3.0.3:compile,New BSD License,http://code.google.com/p/kryo
 com.esotericsoftware.kryo:kryo:jar:2.21:compile,New BSD License,http://code.google.com/p/kryo/
 com.esotericsoftware.minlog:minlog:jar:1.2:compile,New BSD License,http://code.google.com/p/minlog/
 com.esotericsoftware.minlog:minlog:jar:1.3.0:compile,New BSD License,http://code.google.com/p/minlog/

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index 6e2b11e..317227b 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -71,7 +71,7 @@ public class ProfileWriter {
     for(int i=0; i<count; i++) {
 
       // generate the next value that should be written
-      Object nextValue = valueGenerator.apply(m.getValue());
+      Object nextValue = valueGenerator.apply(m.getProfileValue());
 
       // create a measurement for the next profile period to be written
       ProfilePeriod next = m.getPeriod().next();
@@ -80,7 +80,7 @@ public class ProfileWriter {
               .withEntity(prototype.getEntity())
               .withPeriod(next.getStartTimeMillis(), prototype.getPeriod().getDurationMillis(), TimeUnit.MILLISECONDS)
               .withGroups(group)
-              .withValue(nextValue);
+              .withProfileValue(nextValue);
 
       write(m);
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
index 4c38fac..b444ba1 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.configuration.profiler.ProfileResult;
 import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.common.dsl.StellarFunctions;
@@ -40,9 +41,11 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 
@@ -140,26 +143,36 @@ public class ProfileBuilder implements Serializable {
    *
    * Completes and emits the ProfileMeasurement.  Clears all state in preparation for
    * the next window period.
+   *
    * @return Returns the completed profile measurement.
    */
   public ProfileMeasurement flush() {
     LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
 
-    // execute the 'result' expression
+    // execute the 'profile' expression(s)
     @SuppressWarnings("unchecked")
-    Object value = execute(definition.getResult(), new JSONObject(), "result");
+    Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile");
+
+    // execute the 'triage' expression(s)
+    Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions()
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                    e -> e.getKey(),
+                    e -> execute(e.getValue(), "result/triage")));
 
     // execute the 'groupBy' expression(s) - can refer to value of 'result' expression
-    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", value), "groupBy");
+    List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy");
 
     isInitialized = false;
-
     return new ProfileMeasurement()
             .withProfileName(profileName)
             .withEntity(entity)
             .withGroups(groups)
             .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS)
-            .withValue(value);
+            .withProfileValue(profileValue)
+            .withTriageValues(triageValues)
+            .withDefinition(definition);
   }
 
   /**
@@ -181,6 +194,17 @@ public class ProfileBuilder implements Serializable {
   }
 
   /**
+   * Executes an expression contained within the profile definition.
+   * @param expression The expression to execute.
+   * @param expressionType The type of expression; init, update, result.  Provides additional context if expression execution fails.
+   * @return The result of executing the expression.
+   */
+  private Object execute(String expression, String expressionType) {
+    return execute(expression, Collections.emptyMap(), expressionType);
+  }
+
+
+  /**
    * Executes a set of expressions whose results need to be assigned to a variable.
    * @param expressions Maps the name of a variable to the expression whose result should be assigned to it.
    * @param transientState Additional transient state provided to the expression.

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index bbd17a5..e9ac945 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -20,8 +20,11 @@
 
 package org.apache.metron.profiler;
 
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,11 +46,6 @@ public class ProfileMeasurement {
   private String entity;
 
   /**
-   * The actual measurement itself.
-   */
-  private Object value;
-
-  /**
    * The 'groups' used to sort the Profile data. The groups are the result of
    * executing the Profile's 'groupBy' expression.
    */
@@ -58,6 +56,24 @@ public class ProfileMeasurement {
    */
   private ProfilePeriod period;
 
+  /**
+   * The profile definition that resulted in this measurement.
+   */
+  private ProfileConfig definition;
+
+  /**
+   * The result of evaluating the profile expression.
+   */
+  private Object profileValue;
+
+  /**
+   * The result of evaluating the triage expression(s).
+   *
+   * A profile can generate one or more values that can be used during the
+   * threat triage process.  Each value is given a unique name.
+   */
+  private Map<String, Object> triageValues;
+
   public ProfileMeasurement() {
     this.groups = Collections.emptyList();
   }
@@ -72,11 +88,6 @@ public class ProfileMeasurement {
     return this;
   }
 
-  public ProfileMeasurement withValue(Object value) {
-    this.value = value;
-    return this;
-  }
-
   public ProfileMeasurement withGroups(List<Object> groups) {
     this.groups = groups;
     return this;
@@ -87,58 +98,74 @@ public class ProfileMeasurement {
     return this;
   }
 
+  public ProfileMeasurement withDefinition(ProfileConfig definition) {
+    this.definition = definition;
+    return this;
+  }
+
+  public ProfileMeasurement withProfileValue(Object profileValue) {
+    this.profileValue = profileValue;
+    return this;
+  }
+
+  public ProfileMeasurement withTriageValues(Map<String, Object> triageValues) {
+    this.triageValues = triageValues;
+    return this;
+  }
+
   public String getProfileName() {
     return profileName;
   }
 
+  public void setProfileName(String profileName) {
+    this.profileName = profileName;
+  }
+
   public String getEntity() {
     return entity;
   }
 
-  public Object getValue() {
-    return value;
+  public void setEntity(String entity) {
+    this.entity = entity;
+  }
+
+  public List<Object> getGroups() {
+    return groups;
+  }
+
+  public void setGroups(List<Object> groups) {
+    this.groups = groups;
   }
 
   public ProfilePeriod getPeriod() {
     return period;
   }
 
-  public List<Object> getGroups() {
-    return groups;
+  public void setPeriod(ProfilePeriod period) {
+    this.period = period;
   }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+  public ProfileConfig getDefinition() {
+    return definition;
+  }
 
-    ProfileMeasurement that = (ProfileMeasurement) o;
+  public void setDefinition(ProfileConfig definition) {
+    this.definition = definition;
+  }
+
+  public Object getProfileValue() {
+    return profileValue;
+  }
 
-    if (profileName != null ? !profileName.equals(that.profileName) : that.profileName != null) return false;
-    if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
-    if (value != null ? !value.equals(that.value) : that.value != null) return false;
-    if (groups != null ? !groups.equals(that.groups) : that.groups != null) return false;
-    return period != null ? period.equals(that.period) : that.period == null;
+  public void setProfileValue(Object profileValue) {
+    this.profileValue = profileValue;
   }
 
-  @Override
-  public int hashCode() {
-    int result = profileName != null ? profileName.hashCode() : 0;
-    result = 31 * result + (entity != null ? entity.hashCode() : 0);
-    result = 31 * result + (value != null ? value.hashCode() : 0);
-    result = 31 * result + (groups != null ? groups.hashCode() : 0);
-    result = 31 * result + (period != null ? period.hashCode() : 0);
-    return result;
+  public Map<String, Object> getTriageValues() {
+    return triageValues;
   }
 
-  @Override
-  public String toString() {
-    return "ProfileMeasurement{" +
-            "profileName='" + profileName + '\'' +
-            ", entity='" + entity + '\'' +
-            ", value=" + value +
-            ", groups=" + groups +
-            ", period=" + period +
-            '}';
+  public void setTriageValues(Map<String, Object> triageValues) {
+    this.triageValues = triageValues;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index 2f7f356..c2d8b21 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -69,6 +69,13 @@ public class ProfilePeriod {
   }
 
   /**
+   * When this period ended in milliseconds since the epoch.
+   */
+  public long getEndTimeMillis() {
+    return getStartTimeMillis() + getDurationMillis();
+  }
+
+  /**
    * Returns the next ProfilePeriod in time.
    */
   public ProfilePeriod next() {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index bb1baf6..88ec806 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -49,7 +49,7 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
   public ColumnList columns(ProfileMeasurement measurement) {
 
     ColumnList cols = new ColumnList();
-    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getValue()));
+    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getProfileValue()));
 
     return cols;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
index 1434353..794fde4 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileBuilderTest.java
@@ -90,7 +90,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate that x = 100, y = 200
-    assertEquals(100 + 200, (int) convert(m.getValue(), Integer.class));
+    assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -111,7 +111,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate that x = 0 and y = 0 as no initialization occurred
-    assertEquals(0, (int) convert(m.getValue(), Integer.class));
+    assertEquals(0, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -153,7 +153,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate that x=0, y=0 then x+=1, y+=2 for each message
-    assertEquals(count*1 + count*2, (int) convert(m.getValue(), Integer.class));
+    assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -185,7 +185,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate
-    assertEquals(100, (int) convert(m.getValue(), Integer.class));
+    assertEquals(100, (int) convert(m.getProfileValue(), Integer.class));
   }
 
   /**
@@ -260,8 +260,8 @@ public class ProfileBuilderTest {
 
     // validate
     assertEquals(2, m.getGroups().size());
-    assertEquals(100, (int) convert(m.getGroups().get(0), Integer.class));
-    assertEquals(200, (int) convert(m.getGroups().get(1), Integer.class));
+    assertEquals(100, m.getGroups().get(0));
+    assertEquals(200, m.getGroups().get(1));
   }
 
   /**
@@ -304,7 +304,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate
-    assertEquals(33, (int) convert(m.getValue(), Integer.class));
+    assertEquals(33, m.getProfileValue());
   }
 
   /**
@@ -347,7 +347,7 @@ public class ProfileBuilderTest {
     ProfileMeasurement m = builder.flush();
 
     // validate
-    assertEquals(3, (int) convert(m.getValue(), Integer.class));
+    assertEquals(3, m.getProfileValue());
   }
   /**
    * {
@@ -381,4 +381,81 @@ public class ProfileBuilderTest {
     assertEquals(entity, m.getEntity());
   }
 
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *      "x": "100"
+   *   },
+   *   "result": {
+   *      "profile": "x"
+   *   }
+   * }
+   */
+  @Multiline
+  private String testResultWithProfileExpression;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResultWithProfileExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultWithProfileExpression, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(100, m.getProfileValue());
+  }
+
+  /**
+   * {
+   *   "profile": "test",
+   *   "foreach": "ip_src_addr",
+   *   "init": {
+   *      "x": "100"
+   *   },
+   *   "result": {
+   *      "profile": "x",
+   *      "triage": {
+   *        "zero": "x - 100",
+   *        "hundred": "x"
+   *      }
+   *   }
+   * }
+   */
+  @Multiline
+  private String testResultWithTriageExpression;
+
+  /**
+   * Ensure that the result expression is executed on a flush.
+   */
+  @Test
+  public void testResultWithTriageExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(testResultWithTriageExpression, ProfileConfig.class);
+    builder = new ProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .build();
+
+    // execute
+    builder.apply(message);
+    ProfileMeasurement m = builder.flush();
+
+    // validate
+    assertEquals(0, m.getTriageValues().get("zero"));
+    assertEquals(100, m.getTriageValues().get("hundred"));
+    assertEquals(100, m.getProfileValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
index 5d7d121..57edea0 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilderTest.java
@@ -226,7 +226,7 @@ public class SaltyRowKeyBuilderTest {
             .withProfileName("profile")
             .withEntity("entity")
             .withPeriod(oldest, periodDuration, periodUnits)
-            .withValue(22);
+            .withProfileValue(22);
 
     // generate a list of expected keys
     List<byte[]> expectedKeys = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index 9768c07..b4fc104 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -80,16 +80,16 @@ The Profiler specification requires a JSON-formatted set of elements, many of wh
 The specification for the Profiler topology is stored in Zookeeper at  `/metron/topology/profiler`.  These properties also exist in the local filesystem at `$METRON_HOME/config/zookeeper/profiler.json`. 
 The values can be changed on disk and then uploaded to Zookeeper using `$METRON_HOME/bin/zk_load_configs.sh`.
 
-| Name                  | Required | Description                                                                             |
-| --------------------- | -------- | --------------------------------------------------------------------------------------- |
-| [profile](#profile)   | Required | Unique name identifying the profile.                                                    |
-| [foreach](#foreach)   | Required | A separate profile is maintained "for each" of these.                                   |
-| [onlyif](#onlyif)     | Optional | Boolean expression that determines if a message should be applied to the profile.       |
-| [groupBy](#groupby)   | Optional | One or more Stellar expressions used to group the profile measurements when persisted.  |
-| [init](#init)         | Optional | One or more expressions executed at the start of a window period.                       |
-| [update](#update)     | Required | One or more expressions executed when a message is applied to the profile.              |
-| [result](#result)     | Required | A Stellar expression that is executed when the window period expires.                   |
-| [expires](#expires)   | Optional | Profile data is purged after this period of time, specified in milliseconds.            |
+| Name 	                        |               | Description 	
+|---	                        |---	        |---
+| [profile](#profile)           | Required   	| Unique name identifying the profile. 
+| [foreach](#foreach)           | Required  	| A separate profile is maintained "for each" of these. 
+| [onlyif](#onlyif)  	        | Optional  	| Boolean expression that determines if a message should be applied to the profile.
+| [groupBy](#groupby)           | Optional      | One or more Stellar expressions used to group the profile measurements when persisted.
+| [init](#init)  	            | Optional  	| One or more expressions executed at the start of a window period.
+| [update](#update)  	        | Required  	| One or more expressions executed when a message is applied to the profile.
+| [result](#result)   	        | Required  	| Stellar expressions that are executed when the window period expires.
+| [expires](#expires)           | Optional      | Profile data is purged after this period of time, specified in milliseconds.
 
 ### `profile` 
 
@@ -153,7 +153,44 @@ One or more expressions executed when a message is applied to the profile.  A ma
 
 *Required*
 
-A Stellar expression that is executed when the window period expires.  The expression is expected to summarize the messages that were applied to the profile over the window period, using the state accumulated by the updates.  The result will typically be a single numeric value, but it may be any serializable object, as shown in Example 4 below.  	   
+Stellar expressions that are executed when the window period expires.  The expressions are expected to summarize the messages that were applied to the profile over the window period.  In the most basic form a single result is persisted for later retrieval.
+```
+"result": "var1 + var2"
+```
+
+For more advanced use cases, a profile can generate two types of results.  A profile can define one or both of these result types at the same time. 
+* `profile`:  A required expression that defines a value that is persisted for later retrieval.
+* `triage`: An optional expression that defines values that are accessible within the Threat Triage process.
+
+**profile**
+
+A required Stellar expression that results in a value that is persisted in the profile store for later retrieval.  The expression can result in any object that is Kryo serializable.  These values can be retrieved for later use with the [Profiler Client](../metron-profiler-client). 
+```
+"result": {
+    "profile": "2 + 2"
+}
+```
+
+An alternative, simplified form is also acceptable.
+```
+"result": "2 + 2"
+
+```
+
+**triage**
+
+An optional map of one or more Stellar expressions. The value of each expression is made available to the Threat Triage process under the given name.  Each expression must result in a either a primitive type, like an integer, long, or short, or a String.  All other types will result in an error.
+
+In the following example, three values, the minimum, the maximum and the mean are appended to a message.  This message is consumed by Metron, like other sources of telemetry, and each of these values are accessible from within the Threat Triage process using the given field names; `min`, `max`, and `mean`.
+```
+"result": {
+    "triage": {
+        "min": "STATS_MIN(stats)",
+        "max": "STATS_MAX(stats)",
+        "mean": "STATS_MEAN(stats)"
+    }
+}
+```
 
 ### `expires`
 
@@ -166,21 +203,22 @@ A numeric value that defines how many days the profile data is retained.  After
 The Profiler runs as an independent Storm topology.  The configuration for the Profiler topology is stored in local filesystem at `$METRON_HOME/config/profiler.properties`. 
 The values can be changed on disk and then the Profiler topology must be restarted.
 
-| Setting                               | Description                                                                                                                                                                                                 |
-| ------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
-| profiler.workers                      | The number of worker processes to create for the topology.                                                                                                                                                  |
-| profiler.executors                    | The number of executors to spawn per component.                                                                                                                                                             |
-| profiler.input.topic                  | The name of the Kafka topic from which to consume data.                                                                                                                                                     |
-| profiler.period.duration              | The duration of each profile period.  This value should be defined along with `profiler.period.duration.units`.                                                                                             |
-| profiler.period.duration.units        | The units used to specify the `profiler.period.duration`.                                                                                                                                                   |
-| profiler.ttl                          | If a message has not been applied to a Profile in this period of time, the Profile will be forgotten and its resources will be cleaned up. This value should be defined along with `profiler.ttl.units`.    |
-| profiler.ttl.units                    | The units used to specify the `profiler.ttl`.                                                                                                                                                               |
-| profiler.hbase.salt.divisor           |  A salt is prepended to the row key to help prevent hotspotting.  This constant is used to generate the salt.  Ideally, this constant should be roughly equal to the number of nodes in the Hbase cluster.  |
-| profiler.hbase.table                  | The name of the HBase table that profiles are written to.                                                                                                                                                   |
-| profiler.hbase.column.family          | The column family used to store profiles.                                                                                                                                                                   |
-| profiler.hbase.batch                  | The number of puts that are written in a single batch.                                                                                                                                                      |
-| profiler.hbase.flush.interval.seconds | The maximum number of seconds between batch writes to HBase.                                                                                                                                                |
 
+| Setting   | Description   |
+|---        |---            |
+| profiler.workers | The number of worker processes to create for the topology.   |
+| profiler.executors | The number of executors to spawn per component.  |
+| profiler.input.topic | The name of the Kafka topic from which to consume data.  |
+| profiler.output.topic | The name of the Kafka topic to which profile data is written.  Only used with profiles that use the [`triage` result field](#result).  |
+| profiler.period.duration | The duration of each profile period.  This value should be defined along with `profiler.period.duration.units`.  |
+| profiler.period.duration.units | The units used to specify the `profiler.period.duration`. |
+| profiler.ttl | If a message has not been applied to a Profile in this period of time, the Profile will be forgotten and its resources will be cleaned up. This value should be defined along with `profiler.ttl.units`. |
+| profiler.ttl.units | The units used to specify the `profiler.ttl`. |
+| profiler.hbase.salt.divisor  |  A salt is prepended to the row key to help prevent hotspotting.  This constant is used to generate the salt.  Ideally, this constant should be roughly equal to the number of nodes in the Hbase cluster.  |
+| profiler.hbase.table | The name of the HBase table that profiles are written to.  |
+| profiler.hbase.column.family | The column family used to store profiles. |
+| profiler.hbase.batch | The number of puts that are written in a single batch.  |
+| profiler.hbase.flush.interval.seconds | The maximum number of seconds between batch writes to HBase. |
 
 After altering the configuration, start the Profiler.
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/pom.xml b/metron-analytics/metron-profiler/pom.xml
index 355c4b0..3c8baef 100644
--- a/metron-analytics/metron-profiler/pom.xml
+++ b/metron-analytics/metron-profiler/pom.xml
@@ -54,12 +54,27 @@
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-writer</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-statistics</artifactId>
             <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -98,6 +113,11 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+            <version>3.0.3</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
             <version>${global_hadoop_version}</version>
@@ -164,6 +184,10 @@
                     <artifactId>log4j-slf4j-impl</artifactId>
                     <groupId>org.apache.logging.log4j</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware</groupId>
+                </exclusion>
             </exclusions>
             <scope>provided</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 1f4cc0d..91e4226 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -23,6 +23,7 @@
 profiler.workers=1
 profiler.executors=0
 profiler.input.topic=indexing
+profiler.output.topic=enrichments
 profiler.period.duration=15
 profiler.period.duration.units=MINUTES
 profiler.ttl=30
@@ -33,6 +34,7 @@ profiler.hbase.column.family=P
 profiler.hbase.batch=10
 profiler.hbase.flush.interval.seconds=30
 
+
 ##### Kafka #####
 
 kafka.zk=node1:2181

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 0b2a0db..0a26b73 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,10 +17,12 @@
 name: "profiler"
 
 config:
+
     topology.workers: ${profiler.workers}
     topology.acker.executors: ${profiler.executors}
 
 components:
+
     -   id: "rowKeyBuilder"
         className: "org.apache.metron.profiler.hbase.SaltyRowKeyBuilder"
         properties:
@@ -61,8 +63,21 @@ components:
             - "indexing"
         configMethods:
             -   name: "from"
-                args:
-                    - "${kafka.start}"
+                args: ["${kafka.start}"]
+
+    -   id: "kafkaWriter"
+        className: "org.apache.metron.writer.kafka.KafkaWriter"
+        configMethods:
+            -   name: "withTopic"
+                args: ["${profiler.output.topic}"]
+            -   name: "withZkQuorum"
+                args: ["${kafka.zk}"]
+
+    -   id: "kafkaDestinationHandler"
+        className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler"
+
+    -   id: "hbaseDestinationHandler"
+        className: "org.apache.metron.profiler.bolt.HBaseDestinationHandler"
 
 spouts:
 
@@ -85,8 +100,12 @@ bolts:
         configMethods:
             - name: "withPeriodDuration"
               args: [${profiler.period.duration}, "${profiler.period.duration.units}"]
-            - name: "withTimeToLive"
+            - name: "withProfileTimeToLive"
               args: [${profiler.ttl}, "${profiler.ttl.units}"]
+            - name: "withDestinationHandler"
+              args: [ref: "kafkaDestinationHandler"]
+            - name: "withDestinationHandler"
+              args: [ref: "hbaseDestinationHandler"]
 
     -   id: "hbaseBolt"
         className: "org.apache.metron.hbase.bolt.HBaseBolt"
@@ -101,6 +120,16 @@ bolts:
             - name: "withFlushIntervalSecs"
               args: [${profiler.hbase.flush.interval.seconds}]
 
+    -   id: "kafkaBolt"
+        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
+        constructorArgs:
+            - "${kafka.zk}"
+        configMethods:
+            -   name: "withMessageWriter"
+                args: [ref: "kafkaWriter"]
+            -   name: "withMessageGetter"
+                args: ["NAMED"]
+
 streams:
 
     -   name: "spout -> splitter"
@@ -114,10 +143,18 @@ streams:
         to: "builderBolt"
         grouping:
             type: FIELDS
-            args: ["entity", "profile", "message"]
+            args: ["entity", "profile"]
 
     -   name: "builder -> hbase"
         from: "builderBolt"
         to: "hbaseBolt"
         grouping:
+            streamId: "hbase"
+            type: SHUFFLE
+
+    -   name: "builder -> kafka"
+        from: "builderBolt"
+        to: "kafkaBolt"
+        grouping:
+            streamId: "kafka"
             type: SHUFFLE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
new file mode 100644
index 0000000..2257784
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/DestinationHandler.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+
+/**
+ * This class handles the mechanics of emitting a profile measurement to a
+ * stream responsible for writing to a specific destination.
+ *
+ * The measurements produced by a profile can be written to one or more
+ * destinations; HBase, Kafka, etc.  Each of the destinations leverage a
+ * separate stream within the topology definition.
+ */
+public interface DestinationHandler {
+
+  /**
+   * Each destination leverages a unique stream.  This method defines
+   * the unique stream identifier.
+   *
+   * The stream identifier must also be declared within the topology
+   * definition.
+   */
+  String getStreamId();
+
+  /**
+   * Declares the output fields for the stream.
+   * @param declarer
+   */
+  void declareOutputFields(OutputFieldsDeclarer declarer);
+
+  /**
+   * Emit the measurement.
+   * @param measurement The measurement to emit.
+   * @param collector The output collector.
+   */
+  void emit(ProfileMeasurement measurement, OutputCollector collector);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
new file mode 100644
index 0000000..4fa5dc1
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/HBaseDestinationHandler.java
@@ -0,0 +1,58 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.bolt;
+
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.io.Serializable;
+
+/**
+ * Handles emitting a ProfileMeasurement to the stream which writes
+ * profile measurements to HBase.
+ */
+public class HBaseDestinationHandler implements DestinationHandler, Serializable {
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private String streamId = "hbase";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declareStream(getStreamId(), new Fields("measurement"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+    collector.emit(getStreamId(), new Values(measurement));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
new file mode 100644
index 0000000..5d8c971
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/KafkaDestinationHandler.java
@@ -0,0 +1,110 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Handles emitting a ProfileMeasurement to the stream which writes
+ * profile measurements to Kafka.
+ */
+public class KafkaDestinationHandler implements DestinationHandler, Serializable {
+
+  protected static final Logger LOG = LoggerFactory.getLogger(KafkaDestinationHandler.class);
+
+  /**
+   * The stream identifier used for this destination;
+   */
+  private String streamId = "kafka";
+
+  /**
+   * The 'source.type' of messages originating from the Profiler.
+   */
+  private String sourceType = "profiler";
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    // the kafka writer expects a field named 'message'
+    declarer.declareStream(getStreamId(), new Fields("message"));
+  }
+
+  @Override
+  public void emit(ProfileMeasurement measurement, OutputCollector collector) {
+
+    JSONObject message = new JSONObject();
+    message.put("profile", measurement.getDefinition().getProfile());
+    message.put("entity", measurement.getEntity());
+    message.put("period", measurement.getPeriod().getPeriod());
+    message.put("period.start", measurement.getPeriod().getStartTimeMillis());
+    message.put("period.end", measurement.getPeriod().getEndTimeMillis());
+    message.put("timestamp", System.currentTimeMillis());
+    message.put("source.type", sourceType);
+    message.put("is_alert", "true");
+
+    // append each of the triage values to the message
+    measurement.getTriageValues().forEach((key, value) -> {
+
+      if(isValidType(value)) {
+        message.put(key, value);
+
+      } else {
+        LOG.error(String.format("triage expression has invalid type. expect primitive types only. skipping: profile=%s, entity=%s, expression=%s, type=%s",
+                measurement.getDefinition().getProfile(), measurement.getEntity(), key, ClassUtils.getShortClassName(value, "null")));
+      }
+    });
+
+    collector.emit(getStreamId(), new Values(message));
+  }
+
+  /**
+   * The result of a profile's triage expressions must be a string or primitive type.
+   *
+   * This ensures that the value can be easily serialized and appended to a message destined for Kafka.
+   *
+   * @param value The value of a triage expression.
+   * @return True, if the type of the value is valid.
+   */
+  private boolean isValidType(Object value) {
+    return value != null && (value instanceof String || ClassUtils.isPrimitiveOrWrapper(value.getClass()));
+  }
+
+  @Override
+  public String getStreamId() {
+    return streamId;
+  }
+
+  public void setStreamId(String streamId) {
+    this.streamId = streamId;
+  }
+
+  public void setSourceType(String sourceType) {
+    this.sourceType = sourceType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index a22361d..695f7b7 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -32,15 +32,15 @@ import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.TupleUtils;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -70,9 +70,9 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
    * If a message has not been applied to a Profile in this number of milliseconds,
    * the Profile will be forgotten and its resources will be cleaned up.
    *
-   * The TTL must be at least greater than the period duration.
+   * WARNING: The TTL must be at least greater than the period duration.
    */
-  private long timeToLiveMillis;
+  private long profileTimeToLiveMillis;
 
   /**
    * Maintains the state of a profile which is unique to a profile/entity pair.
@@ -85,10 +85,17 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private transient JSONParser parser;
 
   /**
+   * The measurements produced by a profile can be written to multiple destinations.  Each
+   * destination is handled by a separate `DestinationHandler`.
+   */
+  private List<DestinationHandler> destinationHandlers;
+
+  /**
    * @param zookeeperUrl The Zookeeper URL that contains the configuration data.
    */
   public ProfileBuilderBolt(String zookeeperUrl) {
     super(zookeeperUrl);
+    this.destinationHandlers = new ArrayList<>();
   }
 
   /**
@@ -107,34 +114,45 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     super.prepare(stormConf, context, collector);
 
-    if(timeToLiveMillis < periodDurationMillis) {
+    if(profileTimeToLiveMillis < periodDurationMillis) {
       throw new IllegalStateException(format(
               "invalid configuration: expect profile TTL (%d) to be greater than period duration (%d)",
-              timeToLiveMillis,
+              profileTimeToLiveMillis,
               periodDurationMillis));
     }
     this.collector = collector;
     this.parser = new JSONParser();
     this.profileCache = CacheBuilder
             .newBuilder()
-            .expireAfterAccess(timeToLiveMillis, TimeUnit.MILLISECONDS)
+            .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
             .build();
   }
 
-  /**
-   * The builder emits a single field, 'measurement', which contains a ProfileMeasurement. A
-   * ProfileMeasurement is emitted when a time window expires and a flush occurs.
-   */
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    // once the time window expires, a complete ProfileMeasurement is emitted
-    declarer.declare(new Fields("measurement", "profile"));
+    if(destinationHandlers.size() == 0) {
+      throw new IllegalStateException("At least one destination handler must be defined.");
+    }
+
+    // each destination will define its own stream
+    destinationHandlers.forEach(dest -> dest.declareOutputFields(declarer));
   }
 
+  /**
+   * Expect to receive either a tick tuple or a telemetry message that needs applied
+   * to a profile.
+   * @param input The tuple.
+   */
   @Override
   public void execute(Tuple input) {
     try {
-      doExecute(input);
+      if(TupleUtils.isTick(input)) {
+        handleTick();
+        profileCache.cleanUp();
+
+      } else {
+        handleMessage(input);
+      }
 
     } catch (Throwable e) {
       LOG.error(format("Unexpected failure: message='%s', tuple='%s'", e.getMessage(), input), e);
@@ -146,32 +164,28 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   }
 
   /**
-   * Update the execution environment based on data contained in the
-   * message.  If the tuple is a tick tuple, then flush the profile
-   * and reset the execution environment.
-   * @param input The tuple to execute.
+   * Handles a telemetry message
+   * @param input The tuple.
    */
-  private void doExecute(Tuple input) throws ExecutionException {
-
-    if(TupleUtils.isTick(input)) {
-
-      // when a 'tick' is received, flush the profile and emit the completed profile measurement
-      profileCache.asMap().forEach((key, profileBuilder) -> {
-        if(profileBuilder.isInitialized()) {
-          ProfileMeasurement measurement = profileBuilder.flush();
-          collector.emit(new Values(measurement, profileBuilder.getDefinition()));
-        }
-      });
+  private void handleMessage(Tuple input) throws ExecutionException {
+    JSONObject message = getField("message", input, JSONObject.class);
+    getBuilder(input).apply(message);
+  }
 
-      // cache maintenance
-      profileCache.cleanUp();
+  /**
+   * Handles a tick tuple.
+   */
+  private void handleTick() {
+    profileCache.asMap().forEach((key, profileBuilder) -> {
+      if(profileBuilder.isInitialized()) {
 
-    } else {
+        // flush the profile
+        ProfileMeasurement measurement = profileBuilder.flush();
 
-      // telemetry message provides additional context for 'init' and 'update' expressions
-      JSONObject message = getField("message", input, JSONObject.class);
-      getBuilder(input).apply(message);
-    }
+        // forward the measurement to each destination handler
+        destinationHandlers.forEach(handler -> handler.emit(measurement, collector));
+      }
+    });
   }
 
   /**
@@ -213,7 +227,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
   private <T> T getField(String fieldName, Tuple tuple, Class<T> clazz) {
     T value = ConversionUtils.convert(tuple.getValueByField(fieldName), clazz);
     if(value == null) {
-      throw new IllegalStateException(format("invalid tuple received: missing field '%s'", fieldName));
+      throw new IllegalStateException(format("invalid tuple received: missing or invalid field '%s'", fieldName));
     }
 
     return value;
@@ -228,13 +242,17 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
     return withPeriodDurationMillis(units.toMillis(duration));
   }
 
-  public ProfileBuilderBolt withTimeToLiveMillis(long timeToLiveMillis) {
-    this.timeToLiveMillis = timeToLiveMillis;
+  public ProfileBuilderBolt withProfileTimeToLiveMillis(long timeToLiveMillis) {
+    this.profileTimeToLiveMillis = timeToLiveMillis;
     return this;
   }
 
-  public ProfileBuilderBolt withTimeToLive(int duration, TimeUnit units) {
-    return withTimeToLiveMillis(units.toMillis(duration));
+  public ProfileBuilderBolt withProfileTimeToLive(int duration, TimeUnit units) {
+    return withProfileTimeToLiveMillis(units.toMillis(duration));
   }
 
+  public ProfileBuilderBolt withDestinationHandler(DestinationHandler handler) {
+    this.destinationHandlers.add(handler);
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
index cdde001..5402ac8 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileHBaseMapper.java
@@ -93,7 +93,8 @@ public class ProfileHBaseMapper implements HBaseMapper {
   public Optional<Long> getTTL(Tuple tuple) {
     Optional<Long> result = Optional.empty();
 
-    ProfileConfig profileConfig = (ProfileConfig) tuple.getValueByField("profile");
+    ProfileMeasurement measurement = (ProfileMeasurement) tuple.getValueByField("measurement");
+    ProfileConfig profileConfig = measurement.getDefinition();
     if(profileConfig.getExpires() != null) {
       result = Optional.of(profileConfig.getExpires());
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index ae62699..0fb1fd2 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -134,8 +134,7 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
     Map<String, Object> state = (Map<String, Object>)message;
 
     // is this message needed by this profile?
-    String onlyIf = profile.getOnlyif();
-    if (StringUtils.isBlank(onlyIf) || executor.execute(onlyIf, state, Boolean.class)) {
+    if (executor.execute(profile.getOnlyif(), state, Boolean.class)) {
 
       // what is the name of the entity in this message?
       String entity = executor.execute(profile.getForeach(), state, String.class);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
new file mode 100644
index 0000000..b003ce0
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/readme-example-4/profiler.json
@@ -0,0 +1,11 @@
+{
+  "profiles": [
+    {
+      "profile": "example4",
+      "foreach": "ip_src_addr",
+      "onlyif": "protocol == 'HTTP'",
+      "update": { "s": "STATS_ADD(s, length)" },
+      "result": "s"
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
deleted file mode 100644
index 8f24cea..0000000
--- a/metron-analytics/metron-profiler/src/test/config/zookeeper/write-integer/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "profiles": [
-    {
-      "profile": "example1",
-      "foreach": "ip_src_addr",
-      "init": {},
-      "update": {},
-      "result": "TO_INTEGER(10.0)"
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
new file mode 100644
index 0000000..c3f2584
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/KafkaDestinationHandlerTest.java
@@ -0,0 +1,203 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.bolt;
+
+import com.google.common.collect.ImmutableMap;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests the KafkaDestinationHandler.
+ */
+public class KafkaDestinationHandlerTest {
+
+  /**
+   * {
+   *   "profile": "profile-one-destination",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String profileDefinition;
+
+  private KafkaDestinationHandler handler;
+  private ProfileConfig profile;
+  private OutputCollector collector;
+
+  @Before
+  public void setup() throws Exception {
+    handler = new KafkaDestinationHandler();
+    profile = createDefinition(profileDefinition);
+    collector = Mockito.mock(OutputCollector.class);
+  }
+
+  /**
+   * The handler must serialize the ProfileMeasurement into a JSONObject.
+   */
+  @Test
+  public void testSerialization() throws Exception {
+
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "triage-value"))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+
+    // expect a JSONObject
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+
+    // validate the json
+    JSONObject actual = (JSONObject) values.get(0);
+    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+    assertEquals(measurement.getEntity(), actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+    assertNotNull(actual.get("timestamp"));
+    assertEquals("profiler", actual.get("source.type"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testInvalidType() throws Exception {
+
+    // create one invalid expression and one valid expression
+    Map<String, Object> triageValues = ImmutableMap.of(
+            "invalid", new OnlineStatisticsProvider(),
+            "valid", 4);
+
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(triageValues)
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+
+    // only the triage expression value itself should have been skipped, all others should be there
+    JSONObject actual = (JSONObject) values.get(0);
+    assertEquals(measurement.getDefinition().getProfile(), actual.get("profile"));
+    assertEquals(measurement.getEntity(), actual.get("entity"));
+    assertEquals(measurement.getPeriod().getPeriod(), actual.get("period"));
+    assertEquals(measurement.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(measurement.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+    assertNotNull(actual.get("timestamp"));
+    assertEquals("profiler", actual.get("source.type"));
+
+    // the invalid expression should be skipped due to invalid type
+    assertFalse(actual.containsKey("invalid"));
+
+    // but the valid expression should still be there
+    assertEquals(triageValues.get("valid"), actual.get("valid"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testIntegerIsValidType() throws Exception {
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", 123))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    JSONObject actual = (JSONObject) values.get(0);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Values destined for Kafka can only be serialized into text, which limits the types of values
+   * that can result from a triage expression.  Only primitive types and Strings are allowed.
+   */
+  @Test
+  public void testStringIsValidType() throws Exception {
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withPeriod(20000, 15, TimeUnit.MINUTES)
+            .withTriageValues(Collections.singletonMap("triage-key", "value"))
+            .withDefinition(profile);
+    handler.emit(measurement, collector);
+
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+    verify(collector, times(1)).emit(eq(handler.getStreamId()), arg.capture());
+    Values values = arg.getValue();
+    assertTrue(values.get(0) instanceof JSONObject);
+    JSONObject actual = (JSONObject) values.get(0);
+
+    // the triage expression is valid
+    assertEquals(measurement.getTriageValues().get("triage-key"), actual.get("triage-key"));
+  }
+
+  /**
+   * Creates a profile definition based on a string of JSON.
+   * @param json The string of JSON.
+   */
+  private ProfileConfig createDefinition(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
index 8d610bd..935fe57 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java
@@ -28,6 +28,7 @@ import org.apache.metron.profiler.ProfileBuilder;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.storm.Constants;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
@@ -45,7 +46,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -121,9 +124,13 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
     bolt.setCuratorFramework(client);
     bolt.setTreeCache(cache);
     bolt.withPeriodDuration(10, TimeUnit.MINUTES);
-    bolt.withTimeToLive(30, TimeUnit.MINUTES);
-    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
+    bolt.withProfileTimeToLive(30, TimeUnit.MINUTES);
+
+    // define the valid destinations for the profiler
+    bolt.withDestinationHandler(new HBaseDestinationHandler());
+    bolt.withDestinationHandler(new KafkaDestinationHandler());
 
+    bolt.prepare(new HashMap<>(), topologyContext, outputCollector);
     return bolt;
   }
 
@@ -242,11 +249,11 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
   }
 
   /**
-   * A ProfileMeasurement should be emitted for each profile/entity currently being tracked
-   * by the bolt.
+   * A ProfileMeasurement is build for each profile/entity pair.  A measurement for each profile/entity
+   * pair should be emitted.
    */
   @Test
-  public void testEmitMeasurementsOnFlush() throws Exception {
+  public void testEmitMeasurements() throws Exception {
 
     // setup
     ProfileBuilderBolt bolt = createBolt();
@@ -267,33 +274,64 @@ public class ProfileBuilderBoltTest extends BaseBoltTest {
 
     // capture the ProfileMeasurement that should be emitted
     ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
-    verify(outputCollector, times(2)).emit(arg.capture());
 
-    // validate
-    for(Values value : arg.getAllValues()) {
+    // validate emitted measurements for hbase
+    verify(outputCollector, atLeastOnce()).emit(eq("hbase"), arg.capture());
+    for (Values value : arg.getAllValues()) {
 
       ProfileMeasurement measurement = (ProfileMeasurement) value.get(0);
-      ProfileConfig definition = (ProfileConfig) value.get(1);
+      ProfileConfig definition = measurement.getDefinition();
 
-      if(StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) {
+      if (StringUtils.equals(definitionTwo.getProfile(), definition.getProfile())) {
 
         // validate measurement emitted for profile two
         assertEquals(definitionTwo, definition);
         assertEquals(entity, measurement.getEntity());
         assertEquals(definitionTwo.getProfile(), measurement.getProfileName());
-        assertEquals(1, (int) convert(measurement.getValue(), Integer.class));
+        assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
 
-      } else if(StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) {
+      } else if (StringUtils.equals(definitionOne.getProfile(), definition.getProfile())) {
 
         // validate measurement emitted for profile one
         assertEquals(definitionOne, definition);
         assertEquals(entity, measurement.getEntity());
         assertEquals(definitionOne.getProfile(), measurement.getProfileName());
-        assertEquals(1, (int) convert(measurement.getValue(), Integer.class));
+        assertEquals(1, (int) convert(measurement.getProfileValue(), Integer.class));
 
       } else {
         fail();
       }
     }
   }
+
+  /**
+   * A ProfileMeasurement is build for each profile/entity pair.  The measurement should be emitted to each
+   * destination defined by the profile. By default, a profile uses both Kafka and HBase as destinations.
+   */
+  @Test
+  public void testDestinationHandlers() throws Exception {
+
+    // setup
+    ProfileBuilderBolt bolt = createBolt();
+    ProfileConfig definitionOne = createDefinition(profileOne);
+
+    // apply the message to the first profile
+    final String entity = (String) messageOne.get("ip_src_addr");
+    Tuple tupleOne = createTuple(entity, messageOne, definitionOne);
+    bolt.execute(tupleOne);
+
+    // trigger a flush of the profile
+    bolt.execute(mockTickTuple());
+
+    // capture the values that should be emitted
+    ArgumentCaptor<Values> arg = ArgumentCaptor.forClass(Values.class);
+
+    // validate measurements emitted to HBase
+    verify(outputCollector, times(1)).emit(eq("hbase"), arg.capture());
+    assertTrue(arg.getValue().get(0) instanceof ProfileMeasurement);
+
+    // validate measurements emitted to Kafka
+    verify(outputCollector, times(1)).emit(eq("kafka"), arg.capture());
+    assertTrue(arg.getValue().get(0) instanceof JSONObject);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
index 7e0606e..bdeef0b 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileHBaseMapperTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler.bolt;
 
+import org.apache.metron.common.configuration.profiler.ProfileResult;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.profiler.ProfileMeasurement;
@@ -59,18 +60,18 @@ public class ProfileHBaseMapperTest {
     mapper = new ProfileHBaseMapper();
     mapper.setRowKeyBuilder(rowKeyBuilder);
 
+    profile = new ProfileConfig("profile", "ip_src_addr", new ProfileResult("2 + 2"));
+
     measurement = new ProfileMeasurement()
             .withProfileName("profile")
             .withEntity("entity")
             .withPeriod(20000, 15, TimeUnit.MINUTES)
-            .withValue(22);
-
-    profile = new ProfileConfig();
+            .withProfileValue(22)
+            .withDefinition(profile);
 
     // the tuple will contain the original message
     tuple = mock(Tuple.class);
     when(tuple.getValueByField(eq("measurement"))).thenReturn(measurement);
-    when(tuple.getValueByField(eq("profile"))).thenReturn(profile);
   }
 
   /**
@@ -91,16 +92,8 @@ public class ProfileHBaseMapperTest {
    */
   @Test
   public void testExpiresUndefined() throws Exception {
-
-    // do not set the TTL on the profile
-    ProfileConfig profileNoTTL = new ProfileConfig();
-
-    // the tuple references the profile with the missing TTL
-    Tuple tupleNoTTL = mock(Tuple.class);
-    when(tupleNoTTL.getValueByField(eq("profile"))).thenReturn(profileNoTTL);
-
     // the TTL should not be defined
-    Optional<Long> actual = mapper.getTTL(tupleNoTTL);
+    Optional<Long> actual = mapper.getTTL(tuple);
     Assert.assertFalse(actual.isPresent());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
index 0ac1c33..0879835 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java
@@ -63,7 +63,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "onlyif": "protocol == 'HTTP'",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }
@@ -80,7 +80,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "onlyif": "false",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }
@@ -96,7 +96,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "foreach": "ip_src_addr",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }
@@ -113,7 +113,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest {
    *        "onlyif": "NOT-VALID",
    *        "init": {},
    *        "update": {},
-   *        "result": 2
+   *        "result": "2"
    *      }
    *   ]
    * }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/818b0b17/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index e09f7f1..357908d 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -21,7 +21,6 @@
 package org.apache.metron.profiler.integration;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.commons.lang.math.NumberUtils;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -39,6 +38,8 @@ import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
+import org.apache.metron.statistics.StatisticsProvider;
 import org.apache.metron.test.mock.MockHTable;
 import org.junit.After;
 import org.junit.Assert;
@@ -111,6 +112,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static final String tableName = "profiler";
   private static final String columnFamily = "P";
   private static final double epsilon = 0.001;
+  private static final String inputTopic = Constants.INDEXING_TOPIC;
+  private static final String outputTopic = "profiles";
 
   /**
    * A TableProvider that allows us to mock HBase.
@@ -135,7 +138,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -160,7 +163,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // expect 2 values written by the profile; one for 10.0.0.2 and another for 10.0.0.3
     final int expected = 2;
@@ -193,7 +196,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -208,29 +211,29 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     ));
   }
 
+  /**
+   * Tests the fourth example contained within the README.
+   */
   @Test
-  public void testWriteInteger() throws Exception {
+  public void testExample4() throws Exception {
 
-    setup(TEST_RESOURCES + "/config/zookeeper/write-integer");
+    setup(TEST_RESOURCES + "/config/zookeeper/readme-example-4");
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
-
-    // expect 3 values written by the profile; one for each host
-    final int expected = 3;
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
-    waitOrTimeout(() -> profilerTable.getPutLog().size() >= expected,
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
             timeout(seconds(90)));
 
-    // verify - the profile sees messages from 3 hosts; 10.0.0.[1-3]
-    List<Integer> actuals = read(profilerTable.getPutLog(), columnFamily, columnBuilder.getColumnQualifier("value"), Integer.class);
-    Assert.assertEquals(3, actuals.size());
+    // verify - only 10.0.0.2 sends 'HTTP', thus there should be only 1 value
+    byte[] column = columnBuilder.getColumnQualifier("value");
+    List<OnlineStatisticsProvider> actuals = read(profilerTable.getPutLog(), columnFamily, column, OnlineStatisticsProvider.class);
 
-    // verify - the profile writes 10 as an integer
+    // verify - there are 5 'HTTP' messages each with a length of 20, thus the average should be 20
     Assert.assertTrue(actuals.stream().anyMatch(val ->
-            MathUtils.equals(val, 10.0, epsilon)
+            MathUtils.equals(val.getMean(), 20.0, epsilon)
     ));
   }
 
@@ -241,7 +244,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     // start the topology and write test messages to kafka
     fluxComponent.submitTopology();
-    kafkaComponent.writeMessages(Constants.INDEXING_TOPIC, input);
+    kafkaComponent.writeMessages(inputTopic, input);
 
     // verify - ensure the profile is being persisted
     waitOrTimeout(() -> profilerTable.getPutLog().size() > 0,
@@ -290,7 +293,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("kafka.start", SpoutConfig.Offset.BEGINNING.name());
       setProperty("profiler.workers", "1");
       setProperty("profiler.executors", "0");
-      setProperty("profiler.input.topic", Constants.INDEXING_TOPIC);
+      setProperty("profiler.input.topic", inputTopic);
+      setProperty("profiler.output.topic", outputTopic);
       setProperty("profiler.period.duration", "20");
       setProperty("profiler.period.duration.units", "SECONDS");
       setProperty("profiler.ttl", "30");
@@ -310,8 +314,9 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     zkComponent = getZKServerComponent(topologyProperties);
 
     // create the input topic
-    kafkaComponent = getKafkaComponent(topologyProperties,
-            Arrays.asList(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1)));
+    kafkaComponent = getKafkaComponent(topologyProperties, Arrays.asList(
+            new KafkaComponent.Topic(inputTopic, 1),
+            new KafkaComponent.Topic(outputTopic, 1)));
 
     // upload profiler configuration to zookeeper
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()