You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/05/14 21:00:56 UTC

metron git commit: METRON-1551 Profiler Should Not Use Java Serialization (nickwallen) closes apache/metron#1012

Repository: metron
Updated Branches:
  refs/heads/master b9453aabd -> 9ce4ba5a9


METRON-1551 Profiler Should Not Use Java Serialization (nickwallen) closes apache/metron#1012


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9ce4ba5a
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9ce4ba5a
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9ce4ba5a

Branch: refs/heads/master
Commit: 9ce4ba5a9c8febc3a3ed6992f73f99396d7248a9
Parents: b9453aa
Author: nickwallen <ni...@nickallen.org>
Authored: Mon May 14 17:00:34 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon May 14 17:00:34 2018 -0400

----------------------------------------------------------------------
 .../metron/profiler/ProfileMeasurement.java     |   3 +-
 .../apache/metron/profiler/ProfilePeriod.java   |   3 +-
 .../metron/profiler/ProfileMeasurementTest.java | 108 +++++++++++++++++++
 .../metron/profiler/ProfilePeriodTest.java      |  49 +++++++++
 metron-analytics/metron-profiler/README.md      |  62 +++++++----
 .../src/main/config/profiler.properties         |  13 +++
 .../src/main/flux/profiler/remote.yaml          |   3 +
 .../zookeeper/profile-with-stats/profiler.json  |  12 +++
 .../integration/ProfilerIntegrationTest.java    |  83 +++++++++++++-
 .../statistics/sampling/UniformSampler.java     |   5 +-
 .../package/templates/profiler.properties.j2    |  13 +++
 .../configuration/profiler/ProfileResult.java   |   4 +-
 .../profiler/ProfileResultExpressions.java      |   4 +-
 .../profiler/ProfileTriageExpressions.java      |   3 +-
 .../configuration/profiler/ProfilerConfig.java  |  12 +--
 .../apache/metron/common/utils/SerDeUtils.java  |   7 +-
 .../profiler/ProfilerConfigTest.java            |  85 ++++++++++++++-
 .../stellar/common/utils/BloomFilter.java       |   8 +-
 .../metron/stellar/common/utils/SerDeUtils.java |  16 +--
 .../common/utils/StellarProcessorUtils.java     | 101 +++++++++++++++--
 20 files changed, 537 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index f6cc286..4737c3d 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -22,6 +22,7 @@ package org.apache.metron.profiler;
 
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit;
  * <p>A profile contains many individual {@link ProfileMeasurement} values captured over a
  * period of time.  These values in aggregate form a time series.
  */
-public class ProfileMeasurement {
+public class ProfileMeasurement implements Serializable {
 
   /**
    * The name of the profile that this measurement is associated with.

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index c2d8b21..cbb8275 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -33,7 +34,7 @@ import static java.lang.String.format;
  * The Profiler captures a ProfileMeasurement once every ProfilePeriod.  There can be
  * multiple ProfilePeriods every hour.
  */
-public class ProfilePeriod {
+public class ProfilePeriod implements Serializable {
 
   /**
    * A monotonically increasing number identifying the period.  The first period is 0

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java
new file mode 100644
index 0000000..3a8d650
--- /dev/null
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfileMeasurementTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.profiler.ProfileConfig;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ProfileMeasurementTest {
+
+  /**
+   * {
+   *    "profile": "test",
+   *    "foreach": "ip_src_addr",
+   *    "update": {},
+   *    "result": {
+   *      "profile": "2 + 2",
+   *      "triage": {
+   *        "eight": "4 + 4",
+   *        "sixteen": "8 + 8"
+   *      }
+   *    }
+   * }
+   */
+  @Multiline
+  private String profile;
+  private ProfileConfig definition;
+  private ProfileMeasurement measurement;
+
+  public void setup() throws Exception {
+    definition = ProfileConfig.fromJSON(profile);
+
+    measurement = new ProfileMeasurement()
+            .withProfileName("profile")
+            .withEntity("entity")
+            .withDefinition(definition)
+            .withPeriod(System.currentTimeMillis(), 10, TimeUnit.MINUTES)
+            .withProfileValue(22)
+            .withTriageValues(Collections.singletonMap("max", 200));
+  }
+
+  /**
+   * Ensure that the {@link ProfileMeasurement} can undergo Kryo serialization which
+   * occurs when the Profiler is running in Storm.
+   */
+  @Test
+  public void testKryoSerialization() throws Exception {
+
+    // round-trip serialization
+    byte[] raw = SerDeUtils.toBytes(measurement);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    assertEquals(measurement, actual);
+  }
+
+  /**
+   * Ensure that the {@link ProfileMeasurement} can undergo Java serialization, should a user
+   * prefer that over Kryo serialization, which can occur when the Profiler is running
+   * in Storm.
+   */
+  @Test
+  public void testJavaSerialization() throws Exception {
+
+    // serialize using java
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(bytes);
+    out.writeObject(measurement);
+
+    // the serialized bits
+    byte[] raw = bytes.toByteArray();
+    assertTrue(raw.length > 0);
+
+    // deserialize using java
+    ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw));
+    Object actual = in.readObject();
+
+    // ensure that the round-trip was successful
+    assertEquals(measurement, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
index 1a72111..f52bd09 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/ProfilePeriodTest.java
@@ -20,12 +20,18 @@
 
 package org.apache.metron.profiler;
 
+import org.apache.metron.common.utils.SerDeUtils;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the ProfilePeriod class.
@@ -124,4 +130,47 @@ public class ProfilePeriodTest {
     TimeUnit units = TimeUnit.HOURS;
     new ProfilePeriod(0, duration, units);
   }
+
+  /**
+   * Ensure that the ProfilePeriod can undergo Kryo serialization which
+   * occurs when the Profiler is running in Storm.
+   */
+  @Test
+  public void testKryoSerialization() throws Exception {
+
+    ProfilePeriod expected = new ProfilePeriod(AUG2016, 1, TimeUnit.HOURS);
+
+    // round-trip java serialization
+    byte[] raw = SerDeUtils.toBytes(expected);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * Ensure that the ProfilePeriod can undergo Java serialization, should a user
+   * prefer that over Kryo serialization, which can occur when the Profiler is running
+   * in Storm.
+   */
+  @Test
+  public void testJavaSerialization() throws Exception {
+
+    ProfilePeriod expected = new ProfilePeriod(AUG2016, 1, TimeUnit.HOURS);
+
+    // serialize using java
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(bytes);
+    out.writeObject(expected);
+
+    // the serialized bits
+    byte[] raw = bytes.toByteArray();
+    assertTrue(raw.length > 0);
+
+    // deserialize using java
+    ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw));
+    Object actual = in.readObject();
+
+    // ensure that the round-trip was successful
+    assertEquals(expected, actual);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index 01918aa..79cdd44 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -123,7 +123,7 @@ Creating and refining profiles is an iterative process.  Iterating against a liv
 	[Stellar]>>> %functions PROFILER
 	PROFILER_APPLY, PROFILER_FLUSH, PROFILER_INIT
 	```  
-	
+
 1. Create a simple `hello-world` profile that will count the number of messages for each `ip_src_addr`.  The `SHELL_EDIT` function will open an editor in which you can copy/paste the following Profiler configuration.
 	```
 	[Stellar]>>> conf := SHELL_EDIT()
@@ -142,12 +142,12 @@ Creating and refining profiles is an iterative process.  Iterating against a liv
 	}
 	```
 
-1. Create a Profile execution environment; the Profile Debugger. 
+1. Create a Profile execution environment; the Profile Debugger.
 
 	The Profiler will output the number of profiles that have been defined, the number of messages that have been applied and the number of routes that have been followed.  
 
 	A route is defined when a message is applied to a specific profile.
-	* If a message is not needed by any profile, then there are no routes. 
+	* If a message is not needed by any profile, then there are no routes.
 	* If a message is needed by one profile, then one route has been followed.
 	* If a message is needed by two profiles, then two routes have been followed.
 
@@ -157,7 +157,7 @@ Creating and refining profiles is an iterative process.  Iterating against a liv
 	Profiler{1 profile(s), 0 messages(s), 0 route(s)}
 	```
 
-1. Create a message that mimics the telemetry that your profile will consume. 
+1. Create a message that mimics the telemetry that your profile will consume.
 
 	This message can be as simple or complex as you like.  For the `hello-world` profile, all you need is a message containing an `ip_src_addr` field.
 
@@ -181,11 +181,11 @@ Creating and refining profiles is an iterative process.  Iterating against a liv
 	```
 
 1. Flush the Profiler.  
-	
-	A flush is what occurs at the end of each 15 minute period in the Profiler.  The result is a list of Profile Measurements. Each measurement is a map containing detailed information about the profile data that has been generated. The `value` field is what is written to HBase when running this profile in the Profiler topology. 
-	
+
+	A flush is what occurs at the end of each 15 minute period in the Profiler.  The result is a list of Profile Measurements. Each measurement is a map containing detailed information about the profile data that has been generated. The `value` field is what is written to HBase when running this profile in the Profiler topology.
+
 	There will always be one measurement for each [profile, entity] pair.  This profile simply counts the number of messages by IP source address. Notice that the value is '3' for the entity '10.0.0.1' as we applied 3 messages with an 'ip_src_addr' of ’10.0.0.1'.
-	
+
 	```
 	[Stellar]>>> values := PROFILER_FLUSH(profiler)
 	[Stellar]>>> values
@@ -197,9 +197,9 @@ Creating and refining profiles is an iterative process.  Iterating against a liv
 
 	Once you are happy with your profile against a controlled data set, it can be useful to introduce more complex, live data.  This example extracts 10 messages of live, enriched telemetry to test your profile(s).
 	```
-	[Stellar]>>> %define bootstrap.servers := "node1:6667" 
+	[Stellar]>>> %define bootstrap.servers := "node1:6667"
 	node1:6667
-	[Stellar]>>> msgs := KAFKA_GET("indexing", 10) 
+	[Stellar]>>> msgs := KAFKA_GET("indexing", 10)
 	[Stellar]>>> LENGTH(msgs)
 	10
 	```
@@ -228,7 +228,7 @@ Continuing the previous running example, at this point, you have seen how your p
 	[Stellar]>>>
 	[Stellar]>>> %functions CONFIG CONFIG_GET, CONFIG_PUT
 	```
-	
+
 1. If you haven't already, define your profile.
 	```
 	[Stellar]>>> conf := SHELL_EDIT()
@@ -250,7 +250,7 @@ Continuing the previous running example, at this point, you have seen how your p
 1. Check what is already deployed.  
 
 	Pushing a new profile configuration is destructive.  It will overwrite any existing configuration.  Check what you have out there.  Manually merge the existing configuration with your new profile definition.
-	
+
 	```
 	[Stellar]>>> existing := CONFIG_GET("PROFILER")
 	```
@@ -260,7 +260,7 @@ Continuing the previous running example, at this point, you have seen how your p
 	[Stellar]>>> CONFIG_PUT("PROFILER", conf)
 	```
 
-### Deploying Profiles from the Command Line 
+### Deploying Profiles from the Command Line
 
 1. Create the profile definition in a file located at `$METRON_HOME/config/zookeeper/profiler.json`.  This file will likely not exist, if you have never created Profiles before.
 
@@ -364,7 +364,7 @@ Indicates whether processing time or event time is used. By default, processing
 
 By default, no `timestampField` is defined.  In this case, the Profiler uses system time when generating profiles.  This means that the profiles are generated based on when the data has been processed by the Profiler.  This is also known as 'processing time'.
 
-This is the simplest mode of operation, but has some draw backs.  If the Profiler is consuming live data and all is well, the processing and event times will likely remain similar and consistent. If processing time diverges from event time, then the Profiler will generate skewed profiles. 
+This is the simplest mode of operation, but has some draw backs.  If the Profiler is consuming live data and all is well, the processing and event times will likely remain similar and consistent. If processing time diverges from event time, then the Profiler will generate skewed profiles.
 
 There are a few scenarios that might cause skewed profiles when using processing time.  For example when a system has undergone a scheduled maintenance window and is restarted, a high volume of messages will need to be processed by the Profiler. The output of the Profiler might indicate an increase in activity during this time, although no change in activity actually occurred on the target network. The same situation could occur if an upstream system which provides telemetry undergoes an outage.  
 
@@ -380,7 +380,7 @@ Alternatively, a `timestampField` can be defined.  This must be the name of a fi
 
 * The Profiler will use the same field across all telemetry sources and for all profiles.
 
-* Be aware of clock skew across telemetry sources.  If your profile is processing telemetry from multiple sources where the clock differs significantly, the Profiler may assume that some of those messages are late and will be ignored.  Adjusting the [`profiler.window.duration`](#profilerwindowduration) and [`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks. 
+* Be aware of clock skew across telemetry sources.  If your profile is processing telemetry from multiple sources where the clock differs significantly, the Profiler may assume that some of those messages are late and will be ignored.  Adjusting the [`profiler.window.duration`](#profilerwindowduration) and [`profiler.window.lag`](#profilerwindowlag) can help accommodate skewed clocks.
 
 ### Profiles
 
@@ -516,14 +516,12 @@ The REPL can be a powerful for developing profiles. Read all about [Developing P
 
 ## Configuring the Profiler
 
-The Profiler runs as an independent Storm topology.  The configuration for the Profiler topology is stored in local filesystem at `$METRON_HOME/config/profiler.properties`.
-The values can be changed on disk and then the Profiler topology must be restarted.
-
+The Profiler runs as an independent Storm topology.  The configuration for the Profiler topology is stored in local filesystem at `$METRON_HOME/config/profiler.properties`. After changing these values, the Profiler topology must be restarted for the changes to take effect.
 
 | Setting                                                                       | Description
 |---                                                                            |---
 | [`profiler.input.topic`](#profilerinputtopic)                                 | The name of the input Kafka topic.
-| [`profiler.output.topic`](#profileroutputtopic)                               | The name of the output Kafka topic. 
+| [`profiler.output.topic`](#profileroutputtopic)                               | The name of the output Kafka topic.
 | [`profiler.period.duration`](#profilerperiodduration)                         | The duration of each profile period.  
 | [`profiler.period.duration.units`](#profilerperioddurationunits)              | The units used to specify the [`profiler.period.duration`](#profilerperiodduration).
 | [`profiler.window.duration`](#profilerwindowduration)                         | The duration of each profile window.
@@ -539,6 +537,8 @@ The values can be changed on disk and then the Profiler topology must be restart
 | [`profiler.hbase.column.family`](#profilerhbasecolumnfamily)                  | The column family used to store profiles.
 | [`profiler.hbase.batch`](#profilerhbasebatch)                                 | The number of puts that are written to HBase in a single batch.
 | [`profiler.hbase.flush.interval.seconds`](#profilerhbaseflushintervalseconds) | The maximum number of seconds between batch writes to HBase.
+| [`topology.kryo.register`](#topologykryoregister)                             | Storm will use Kryo serialization for these classes.
+
 
 ### `profiler.input.topic`
 
@@ -654,6 +654,30 @@ The number of puts that are written to HBase in a single batch.
 
 The maximum number of seconds between batch writes to HBase.
 
+### `topology.kryo.register`
+
+*Default*:
+```
+[ org.apache.metron.profiler.ProfileMeasurement, \
+  org.apache.metron.profiler.ProfilePeriod, \
+  org.apache.metron.common.configuration.profiler.ProfileResult, \
+  org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \
+  org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \
+  org.apache.metron.common.configuration.profiler.ProfilerConfig, \
+  org.apache.metron.common.configuration.profiler.ProfileConfig, \
+  org.json.simple.JSONObject, \
+  java.util.LinkedHashMap, \
+  org.apache.metron.statistics.OnlineStatisticsProvider ]
+```               
+
+Storm will use Kryo serialization for these classes. Kryo serialization is more performant than Java serialization, in most cases.  
+
+For these classes, Storm will uses Kryo's `FieldSerializer` as defined in the [Storm Serialization docs](http://storm.apache.org/releases/1.1.2/Serialization.html).  For all other classes not in this list, Storm defaults to using Java serialization which is slower and not recommended for a production topology.
+
+This value should only need altered if you have defined a profile that results in a non-primitive, user-defined type that is not in this list.  If the class is not defined in this list, Java serialization will be used and the class must adhere to Java's serialization requirements.  
+
+The performance of the entire Profiler topology can be negatively impacted if any profile produces results that undergo Java serialization.
+
 ## Examples
 
 The following examples are intended to highlight the functionality provided by the Profiler. Try out these examples easily in the Stellar Shell as described in the [Creating Profiles](#creating-profiles) section.

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index fe3c475..dc30838 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -26,6 +26,19 @@ profiler.workers=1
 profiler.executors=0
 topology.message.timeout.secs=30
 topology.max.spout.pending=100000
+topology.fall.back.on.java.serialization=true
+topology.testing.always.try.serialize=false
+topology.kryo.register=[ org.apache.metron.profiler.ProfileMeasurement, \
+    org.apache.metron.profiler.ProfilePeriod, \
+    org.apache.metron.common.configuration.profiler.ProfileResult, \
+    org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \
+    org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \
+    org.apache.metron.common.configuration.profiler.ProfilerConfig, \
+    org.apache.metron.common.configuration.profiler.ProfileConfig, \
+    org.json.simple.JSONObject, \
+    org.json.simple.JSONArray, \
+    java.util.LinkedHashMap, \
+    org.apache.metron.statistics.OnlineStatisticsProvider ]
 
 ##### Profiler #####
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 6ad007b..5e92c62 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -23,6 +23,9 @@ config:
     topology.auto-credentials: ${topology.auto-credentials}
     topology.message.timeout.secs: ${topology.message.timeout.secs}
     topology.max.spout.pending: ${topology.max.spout.pending}
+    topology.testing.always.try.serialize: ${topology.testing.always.try.serialize}
+    topology.fall.back.on.java.serialization: ${topology.fall.back.on.java.serialization}
+    topology.kryo.register: ${topology.kryo.register}
 
 components:
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
new file mode 100644
index 0000000..083e73f
--- /dev/null
+++ b/metron-analytics/metron-profiler/src/test/config/zookeeper/profile-with-stats/profiler.json
@@ -0,0 +1,12 @@
+{
+  "profiles": [
+    {
+      "profile": "profile-with-stats",
+      "foreach": "'global'",
+      "init":   { "stats": "STATS_INIT()" },
+      "update": { "stats": "STATS_ADD(stats, 1)" },
+      "result": "stats"
+    }
+  ],
+  "timestampField": "timestamp"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 8f5ced3..c02c469 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.metron.profiler.integration;
 
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -45,6 +46,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -94,6 +96,23 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static String message3;
 
   /**
+   * [
+   *    org.apache.metron.profiler.ProfileMeasurement,
+   *    org.apache.metron.profiler.ProfilePeriod,
+   *    org.apache.metron.common.configuration.profiler.ProfileResult,
+   *    org.apache.metron.common.configuration.profiler.ProfileResultExpressions,
+   *    org.apache.metron.common.configuration.profiler.ProfileTriageExpressions,
+   *    org.apache.metron.common.configuration.profiler.ProfilerConfig,
+   *    org.apache.metron.common.configuration.profiler.ProfileConfig,
+   *    org.json.simple.JSONObject,
+   *    java.util.LinkedHashMap,
+   *    org.apache.metron.statistics.OnlineStatisticsProvider
+   *  ]
+   */
+  @Multiline
+  private static String kryoSerializers;
+
+  /**
    * The Profiler can generate profiles based on processing time.  With processing time,
    * the Profiler builds profiles based on when the telemetry was processed.
    *
@@ -164,10 +183,60 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
     // embedded in the row key should match those in the source telemetry
     byte[] expectedRowKey = generateExpectedRowKey("event-time-test", entity, startAt);
     byte[] actualRowKey = puts.get(0).getRow();
-    String msg = String.format("expected '%s', got '%s'",
-            new String(expectedRowKey, "UTF-8"),
-            new String(actualRowKey, "UTF-8"));
-    assertArrayEquals(msg, expectedRowKey, actualRowKey);
+    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey);
+  }
+
+  /**
+   * The result produced by a Profile has to be serializable within Storm. If the result is not
+   * serializable the topology will crash and burn.
+   *
+   * This test ensures that if a profile returns a STATS object created using the STATS_INIT and
+   * STATS_ADD functions, that it can be correctly serialized and persisted.
+   */
+  @Test
+  public void testProfileWithStatsObject() throws Exception {
+
+    // upload the profiler config to zookeeper
+    uploadConfig(TEST_RESOURCES + "/config/zookeeper/profile-with-stats");
+
+    // start the topology and write test messages to kafka
+    fluxComponent.submitTopology();
+    kafkaComponent.writeMessages(inputTopic, message1);
+    kafkaComponent.writeMessages(inputTopic, message2);
+    kafkaComponent.writeMessages(inputTopic, message3);
+
+    // wait until the profile is flushed
+    waitOrTimeout(() -> profilerTable.getPutLog().size() > 0, timeout(seconds(90)));
+
+    // ensure that a value was persisted in HBase
+    List<Put> puts = profilerTable.getPutLog();
+    assertEquals(1, puts.size());
+
+    // generate the expected row key. only the profile name, entity, and period are used to generate the row key
+    ProfileMeasurement measurement = new ProfileMeasurement()
+            .withProfileName("profile-with-stats")
+            .withEntity("global")
+            .withPeriod(startAt, periodDurationMillis, TimeUnit.MILLISECONDS);
+    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDurationMillis, TimeUnit.MILLISECONDS);
+    byte[] expectedRowKey = rowKeyBuilder.rowKey(measurement);
+
+    // ensure the correct row key was generated
+    byte[] actualRowKey = puts.get(0).getRow();
+    assertArrayEquals(failMessage(expectedRowKey, actualRowKey), expectedRowKey, actualRowKey);
+  }
+
+  /**
+   * Generates an error message for if the byte comparison fails.
+   *
+   * @param expected The expected value.
+   * @param actual The actual value.
+   * @return
+   * @throws UnsupportedEncodingException
+   */
+  private String failMessage(byte[] expected, byte[] actual) throws UnsupportedEncodingException {
+    return String.format("expected '%s', got '%s'",
+              new String(expected, "UTF-8"),
+              new String(actual, "UTF-8"));
   }
 
   /**
@@ -249,6 +318,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("topology.message.timeout.secs", "60");
       setProperty("topology.max.spout.pending", "100000");
 
+      // ensure tuples are serialized during the test, otherwise serialization problems
+      // will not be found until the topology is run on a cluster with multiple workers
+      setProperty("topology.testing.always.try.serialize", "true");
+      setProperty("topology.fall.back.on.java.serialization", "false");
+      setProperty("topology.kryo.register", kryoSerializers);
+
       // kafka settings
       setProperty("profiler.input.topic", inputTopic);
       setProperty("profiler.output.topic", outputTopic);

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
index 11460e0..96ab2d4 100644
--- a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
+++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/sampling/UniformSampler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.statistics.sampling;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -25,7 +26,8 @@ import java.util.Random;
  * This is a reservoir sampler without replacement where each element sampled will be included
  * with equal probability in the reservoir.
  */
-public class UniformSampler implements Sampler {
+public class UniformSampler implements Sampler, Serializable {
+
   private List<Object> reservoir;
   private int seen = 0;
   private int size;
@@ -83,7 +85,6 @@ public class UniformSampler implements Sampler {
 
     if (getSize() != that.getSize()) return false;
     return reservoir != null ? reservoir.equals(that.reservoir) : that.reservoir == null;
-
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
index fabdaa7..d8bc13d 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
@@ -26,6 +26,19 @@ profiler.workers={{profiler_topology_workers}}
 profiler.executors={{profiler_acker_executors}}
 topology.message.timeout.secs={{profiler_topology_message_timeout_secs}}
 topology.max.spout.pending={{profiler_topology_max_spout_pending}}
+topology.fall.back.on.java.serialization=true
+topology.testing.always.try.serialize=false
+topology.kryo.register=[ org.apache.metron.profiler.ProfileMeasurement, \
+    org.apache.metron.profiler.ProfilePeriod, \
+    org.apache.metron.common.configuration.profiler.ProfileResult, \
+    org.apache.metron.common.configuration.profiler.ProfileResultExpressions, \
+    org.apache.metron.common.configuration.profiler.ProfileTriageExpressions, \
+    org.apache.metron.common.configuration.profiler.ProfilerConfig, \
+    org.apache.metron.common.configuration.profiler.ProfileConfig, \
+    org.json.simple.JSONObject, \
+    org.json.simple.JSONArray, \
+    java.util.LinkedHashMap, \
+    org.apache.metron.statistics.OnlineStatisticsProvider ]
 
 ##### Profiler #####
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
index 55642a9..e2aa54d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java
@@ -20,10 +20,12 @@ package org.apache.metron.common.configuration.profiler;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
+
 /**
  * Defines the 'result' field of a Profile definition.
  */
-public class ProfileResult {
+public class ProfileResult implements Serializable {
 
   /**
    * A Stellar expression that is executed to produce

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
index 5bcec72..2cada01 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
@@ -20,11 +20,13 @@ package org.apache.metron.common.configuration.profiler;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonValue;
 
+import java.io.Serializable;
+
 /**
  * A Stellar expression that is executed to produce a single
  * measurement that is persisted within the profile store.
  */
-public class ProfileResultExpressions {
+public class ProfileResultExpressions implements Serializable {
 
   private String expression;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
index da02cb2..b1b7175 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonAnySetter;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -32,7 +33,7 @@ import java.util.Map;
  * The result of evaluating each expression are made available, keyed
  * by the given name, to the threat triage process.
  */
-public class ProfileTriageExpressions {
+public class ProfileTriageExpressions implements Serializable {
 
   /**
    * A set of named Stellar expressions.  The name of the expression

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index e4fa99a..068a4c8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -56,7 +56,7 @@ public class ProfilerConfig implements Serializable {
    * <p>If a message does NOT contain this field, it will be dropped
    * and not included in any profiles.
    */
-  private Optional<String> timestampField = Optional.empty();
+  private String timestampField = null;
 
   public List<ProfileConfig> getProfiles() {
     return profiles;
@@ -73,24 +73,24 @@ public class ProfilerConfig implements Serializable {
 
   @JsonGetter("timestampField")
   public String getTimestampFieldForJson() {
-    return timestampField.orElse(null);
+    return timestampField;
   }
 
   public Optional<String> getTimestampField() {
-    return timestampField;
+    return Optional.ofNullable(timestampField);
   }
 
   @JsonSetter("timestampField")
   public void setTimestampField(String timestampField) {
-    this.timestampField = Optional.of(timestampField);
+    this.timestampField = timestampField;
   }
 
   public void setTimestampField(Optional<String> timestampField) {
-    this.timestampField = timestampField;
+    this.timestampField = timestampField.orElse(null);
   }
 
   public ProfilerConfig withTimestampField(Optional<String> timestampField) {
-    this.timestampField = timestampField;
+    this.timestampField = timestampField.orElse(null);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
index 4a89f97..5e2ceb9 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
@@ -47,6 +47,7 @@ import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
 import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer;
 import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer;
 import java.io.ByteArrayInputStream;
+import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationHandler;
@@ -188,7 +189,7 @@ public class SerDeUtils {
 
   public static Serializer SERIALIZER = new Serializer();
 
-  private static class Serializer implements Function<Object, byte[]> {
+  private static class Serializer implements Function<Object, byte[]>, Serializable {
     /**
      * Serializes the given Object into bytes.
      *
@@ -199,9 +200,10 @@ public class SerDeUtils {
     }
   }
 
-  public static class Deserializer<T> implements Function<byte[], T> {
+  public static class Deserializer<T> implements Function<byte[], T>, Serializable {
 
     private Class<T> clazz;
+
     public Deserializer(Class<T> clazz) {
       this.clazz = clazz;
     }
@@ -217,7 +219,6 @@ public class SerDeUtils {
     }
   }
 
-
   private SerDeUtils() {
     // do not instantiate
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
index 1a11811..2cbdfb9 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
@@ -20,9 +20,14 @@
 package org.apache.metron.common.configuration.profiler;
 
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.SerDeUtils;
 import org.junit.Test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -108,7 +113,6 @@ public class ProfilerConfigTest {
   @Test
   public void testFromJSONWithTimestampField() throws IOException {
     ProfilerConfig conf = ProfilerConfig.fromJSON(timestampField);
-
     assertTrue(conf.getTimestampField().isPresent());
   }
 
@@ -206,4 +210,83 @@ public class ProfilerConfigTest {
     ProfilerConfig actual = ProfilerConfig.fromJSON(asJson);
     assertEquals(expected, actual);
   }
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      },
+   *      {
+   *        "profile": "profile2",
+   *        "foreach": "ip_dst_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      },
+   *      {
+   *        "profile": "profile3",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result": {
+   *          "profile": "count",
+   *          "triage" : { "count": "count" }
+   *        }
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profilesToSerialize;
+
+  /**
+   * Ensure that the Profiler configuration can undergo Kryo serialization which
+   * occurs when the Profiler is running in Storm.
+   */
+  @Test
+  public void testKryoSerialization() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfilerConfig expected = ProfilerConfig.fromJSON(profilesToSerialize);
+
+    // round-trip java serialization
+    byte[] raw = SerDeUtils.toBytes(expected);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * Ensure that the Profiler configuration can undergo Java serialization, should a user
+   * prefer that over Kryo serialization, which can occur when the Profiler is running
+   * in Storm.
+   */
+  @Test
+  public void testJavaSerialization() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfilerConfig expected = ProfilerConfig.fromJSON(profilesToSerialize);
+
+    // serialize using java
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    ObjectOutputStream out = new ObjectOutputStream(bytes);
+    out.writeObject(expected);
+
+    // the serialized bits
+    byte[] raw = bytes.toByteArray();
+    assertTrue(raw.length > 0);
+
+    // deserialize using java
+    ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw));
+    Object actual = in.readObject();
+
+    // ensure that the round-trip was successful
+    assertEquals(expected, actual);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
index 445dca5..f205770 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/BloomFilter.java
@@ -26,10 +26,13 @@ import java.util.function.Function;
 public class BloomFilter<T> implements Serializable {
 
   private static class BloomFunnel<T> implements Funnel<T>, Serializable {
+
     Function<T, byte[]> serializer;
+
     public BloomFunnel(Function<T, byte[]> serializer) {
       this.serializer = serializer;
     }
+
     @Override
     public void funnel(T obj, PrimitiveSink primitiveSink) {
       primitiveSink.putBytes(serializer.apply(obj));
@@ -46,12 +49,13 @@ public class BloomFilter<T> implements Serializable {
     }
   }
 
-  public static class DefaultSerializer<T> implements Function<T, byte[]> {
+  public static class DefaultSerializer<T> implements Function<T, byte[]>, Serializable {
     @Override
     public byte[] apply(T t) {
       return SerDeUtils.toBytes(t);
     }
   }
+
   private com.google.common.hash.BloomFilter<T> filter;
 
   public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, double falsePositiveRate) {
@@ -61,9 +65,11 @@ public class BloomFilter<T> implements Serializable {
   public boolean mightContain(T key) {
     return filter.mightContain(key);
   }
+
   public void add(T key) {
     filter.put(key);
   }
+
   public void merge(BloomFilter<T> filter2) {
     filter.putAll(filter2.filter);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
index 9003851..eff4f88 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/SerDeUtils.java
@@ -45,6 +45,7 @@ import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
 import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer;
 import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer;
 import java.io.ByteArrayInputStream;
+import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Modifier;
@@ -88,14 +89,16 @@ public class SerDeUtils {
       UnmodifiableCollectionsSerializer.registerSerializers(ret);
       SynchronizedCollectionsSerializer.registerSerializers(ret);
 
-// custom serializers for non-jdk libs
+      // custom serializers for non-jdk libs
 
-// register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below)
+      // register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below)
       ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new CGLibProxySerializer());
-// joda DateTime, LocalDate and LocalDateTime
+
+      // joda DateTime, LocalDate and LocalDateTime
       ret.register(LocalDate.class, new JodaLocalDateSerializer());
       ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer());
-// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet
+
+      // guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet
       ImmutableListSerializer.registerSerializers(ret);
       ImmutableSetSerializer.registerSerializers(ret);
       ImmutableMapSerializer.registerSerializers(ret);
@@ -187,7 +190,7 @@ public class SerDeUtils {
 
   public static Serializer SERIALIZER = new Serializer();
 
-  private static class Serializer implements Function<Object, byte[]> {
+  private static class Serializer implements Function<Object, byte[]>, Serializable {
     /**
      * Serializes the given Object into bytes.
      *
@@ -198,9 +201,10 @@ public class SerDeUtils {
     }
   }
 
-  public static class Deserializer<T> implements Function<byte[], T> {
+  public static class Deserializer<T> implements Function<byte[], T>, Serializable {
 
     private Class<T> clazz;
+
     public Deserializer(Class<T> clazz) {
       this.clazz = clazz;
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/9ce4ba5a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
index d5f267e..4ad5a40 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
@@ -19,6 +19,7 @@
 package org.apache.metron.stellar.common.utils;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.metron.stellar.common.StellarPredicateProcessor;
 import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
@@ -28,6 +29,11 @@ import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.VariableResolver;
 import org.junit.Assert;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.AbstractMap;
 import java.util.Collections;
 import java.util.List;
@@ -40,6 +46,10 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 /**
  * Utilities for executing and validating Stellar expressions.
  */
@@ -58,24 +68,95 @@ public class StellarProcessorUtils {
    */
   public static Object run(String expression, Map<String, Object> variables, Context context) {
 
-    // validate the expression
-    StellarProcessor processor = new StellarProcessor();
-    Assert.assertTrue("Invalid expression; expr=" + expression,
-            processor.validate(expression, context));
+    validate(expression, context);
+    Object result = execute(expression, variables, context);
+    ensureKryoSerializable(result, expression);
+    ensureJavaSerializable(result, expression);
+
+    return result;
+  }
+
+  /**
+   * Execute a Stellar expression.
+   *
+   * @param expression The expression to execute.
+   * @param variables The variables available to the expression.
+   * @param context The execution context.
+   * @return The result of executing the expression.
+   */
+  private static Object execute(String expression, Map<String, Object> variables, Context context) {
 
-    // execute the expression
-    Object ret = processor.parse(
+    StellarProcessor processor = new StellarProcessor();
+    Object result = processor.parse(
             expression,
             new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)),
             StellarFunctions.FUNCTION_RESOLVER(),
             context);
 
-    // ensure the result can be serialized/deserialized
-    byte[] raw = SerDeUtils.toBytes(ret);
+    return result;
+  }
+
+  /**
+   * Ensure that a value can be serialized and deserialized using Kryo.
+   *
+   * <p>When a Stellar function is used in a Storm topology there are cases when the result
+   * needs to be serializable, like when using the Profiler.  Storm can use either Kryo or
+   * basic Java serialization.  It is highly recommended that all Stellar functions return a
+   * result that is Kryo serializable to allow for the broadest possible use of the function.
+   *
+   * @param value The value to validate.
+   */
+  private static void ensureKryoSerializable(Object value, String expression) {
+
+    String msg = String.format("Expression result is not Kryo serializable. It is highly recommended for all " +
+            "functions to return a result that is Kryo serializable to allow for their broadest possible use. " +
+            "expr=%s, value=%s", expression, value);
+
+    byte[] raw = SerDeUtils.toBytes(value);
     Object actual = SerDeUtils.fromBytes(raw, Object.class);
-    Assert.assertEquals(ret, actual);
+    Assert.assertEquals(msg, value, actual);
+  }
 
-    return ret;
+  /**
+   * Ensure a value can be serialized and deserialized using Java serialization.
+   *
+   * <p>When a Stellar function is used in a Storm topology there are cases when the result
+   * needs to be serializable, like when using the Profiler.  Storm can use either Kryo or
+   * basic Java serialization.  It is highly recommended that all Stellar functions return a
+   * result that is Java serializable to allow for the broadest possible use of the function.
+   *
+   * @param value The value to serialize
+   */
+  private static void ensureJavaSerializable(Object value, String expression) {
+
+    String msg = String.format("Expression result is not Java serializable. It is highly recommended for all " +
+            "functions to return a result that is Java serializable to allow for their broadest possible use. " +
+            "expr=%s, value=%s", expression, value);
+
+    try {
+      // serialize using java
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+      ObjectOutputStream out = new ObjectOutputStream(bytes);
+      out.writeObject(value);
+
+      // the serialized bits
+      byte[] raw = bytes.toByteArray();
+      assertTrue(raw.length > 0);
+
+      // deserialize using java
+      ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(raw));
+      Object actual = in.readObject();
+
+      // ensure that the round-trip was successful
+      assertEquals(msg, value, actual);
+
+    } catch(IOException | ClassNotFoundException e) {
+
+      String error = String.format("Expression result is not Java serializable. It is highly recommended for all " +
+              "functions to return a result that is Java serializable to allow for their broadest possible use. " +
+              "expr=%s, value=%s, error=%s", expression, value, ExceptionUtils.getRootCauseMessage(e));
+      fail(error);
+    }
   }
 
   /**