You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/15 15:41:47 UTC

incubator-metron git commit: METRON-415: Allow a Profile to Store Any Type as its Value closes apache/incubator-metron#253

Repository: incubator-metron
Updated Branches:
  refs/heads/master baf0d24a4 -> 728133b91


METRON-415: Allow a Profile to Store Any Type as its Value closes apache/incubator-metron#253


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/728133b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/728133b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/728133b9

Branch: refs/heads/master
Commit: 728133b9115a0d9774a97fdc43063d8aef9df300
Parents: baf0d24
Author: cstella <ce...@gmail.com>
Authored: Thu Sep 15 11:41:30 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Sep 15 11:41:30 2016 -0400

----------------------------------------------------------------------
 .../profiler/client/HBaseProfilerClient.java    |   4 +-
 .../profiler/client/stellar/GetProfile.java     |   2 +-
 .../metron/profiler/hbase/Serializer.java       |  92 -------
 .../profiler/hbase/ValueOnlyColumnBuilder.java  |   3 +-
 .../metron/profiler/hbase/SerializerTest.java   |  71 -----
 .../integration/ProfilerIntegrationTest.java    |   5 +-
 metron-platform/metron-common/pom.xml           |  14 +
 .../dsl/functions/DataStructureFunctions.java   |   4 +-
 .../apache/metron/common/utils/BloomFilter.java |  23 ++
 .../apache/metron/common/utils/SerDeUtils.java  | 256 +++++++++++++++++++
 .../metron/common/utils/SerializationUtils.java |  63 -----
 .../stellar/StellarStatisticsFunctionsTest.java |  74 +++++-
 .../metron/common/stellar/StellarTest.java      |  18 +-
 .../metron/common/utils/SerDeUtilsTest.java     | 216 ++++++++++++++++
 14 files changed, 608 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index 97691d4..cef2ea4 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.Serializer;
+import org.apache.metron.common.utils.SerDeUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -108,7 +108,7 @@ public class HBaseProfilerClient implements ProfilerClient {
       Arrays.stream(results)
               .filter(r -> r.containsColumn(columnFamily, columnQualifier))
               .map(r -> r.getValue(columnFamily, columnQualifier))
-              .forEach(val -> values.add(Serializer.fromBytes(val, clazz)));
+              .forEach(val -> values.add(SerDeUtils.fromBytes(val, clazz)));
 
     } catch(IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index ffe9470..d96419f 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -142,7 +142,7 @@ public class GetProfile implements StellarFunction {
     TimeUnit units = TimeUnit.valueOf(unitsName);
     List<Object> groups = getGroupsArg(4, args);
 
-    return client.fetch(profile, entity, durationAgo, units, Integer.class, groups);
+    return client.fetch(profile, entity, durationAgo, units, Object.class, groups);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
deleted file mode 100644
index c0fe16f..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.hbase;
-
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Provides basic functionality to serialize and deserialize the allowed
- * value types for a ProfileMeasurement.
- */
-public class Serializer {
-
-  private Serializer() {
-    // do not instantiate
-  }
-
-  /**
-   * Serialize a profile measurement's value.
-   *
-   * The value produced by a Profile definition can be any numeric data type.  The data
-   * type depends on how the profile is defined by the user.  The user should be able to
-   * choose the data type that is most suitable for their use case.
-   *
-   * @param value The value to serialize.
-   */
-  public static byte[] toBytes(Object value) {
-    byte[] result;
-
-    if(value instanceof Integer) {
-      result = Bytes.toBytes((Integer) value);
-    } else if(value instanceof Double) {
-      result = Bytes.toBytes((Double) value);
-    } else if(value instanceof Short) {
-      result = Bytes.toBytes((Short) value);
-    } else if(value instanceof Long) {
-      result = Bytes.toBytes((Long) value);
-    } else if(value instanceof Float) {
-      result = Bytes.toBytes((Float) value);
-    } else {
-      throw new RuntimeException("Expected 'Number': actual=" + value);
-    }
-
-    return result;
-  }
-
-  /**
-   * Deserialize a profile measurement's value.
-   *
-   * The value produced by a Profile definition can be any numeric data type.  The data
-   * type depends on how the profile is defined by the user.  The user should be able to
-   * choose the data type that is most suitable for their use case.
-   *
-   * @param value The value to deserialize.
-   */
-  public static <T> T fromBytes(byte[] value, Class<T> clazz) {
-    T result;
-
-    if(clazz == Integer.class) {
-      result = clazz.cast(Bytes.toInt(value));
-    } else if(clazz == Double.class) {
-      result = clazz.cast(Bytes.toDouble(value));
-    } else if(clazz == Short.class) {
-      result = clazz.cast(Bytes.toShort(value));
-    } else if(clazz == Long.class) {
-      result = clazz.cast(Bytes.toLong(value));
-    } else if(clazz == Float.class) {
-      result = clazz.cast(Bytes.toFloat(value));
-    } else {
-      throw new RuntimeException("Expected 'Number': actual=" + clazz);
-    }
-
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index aeda317..cc6aa5a 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -21,6 +21,7 @@
 package org.apache.metron.profiler.hbase;
 
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.utils.SerDeUtils;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.storm.hbase.common.ColumnList;
 
@@ -48,7 +49,7 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
   public ColumnList columns(ProfileMeasurement measurement) {
 
     ColumnList cols = new ColumnList();
-    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), Serializer.toBytes(measurement.getValue()));
+    cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getValue()));
 
     return cols;
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
deleted file mode 100644
index 69de4ba..0000000
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.hbase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the Serializer.
- */
-public class SerializerTest {
-
-  @Test
-  public void testInteger() {
-    final int expected = 2;
-    byte[] raw = Serializer.toBytes(expected);
-    int actual = Serializer.fromBytes(raw, Integer.class);
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testDouble() {
-    final double expected = 2.0;
-    byte[] raw = Serializer.toBytes(expected);
-    double actual = Serializer.fromBytes(raw, Double.class);
-    assertEquals(expected, actual, 0.01);
-  }
-
-  @Test
-  public void testShort() {
-    final short expected = 2;
-    byte[] raw = Serializer.toBytes(expected);
-    short actual = Serializer.fromBytes(raw, Short.class);
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testLong() {
-    final long expected = 2L;
-    byte[] raw = Serializer.toBytes(expected);
-    long actual = Serializer.fromBytes(raw, Long.class);
-    assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testFloat() {
-    final Float expected = 2.2F;
-    byte[] raw = Serializer.toBytes(expected);
-    float actual = Serializer.fromBytes(raw, Float.class);
-    assertEquals(expected, actual, 0.01);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 91191c8..80b4399 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.common.utils.SerDeUtils;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
@@ -227,7 +228,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     for (Result result : scanner) {
       byte[] raw = result.getValue(cf, columnQual);
-      return Bytes.toDouble(raw);
+      return SerDeUtils.fromBytes(raw, Double.class);
     }
 
     throw new IllegalStateException("No results found");
@@ -243,7 +244,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
     for (Result result : scanner) {
       byte[] raw = result.getValue(cf, columnQual);
-      return Bytes.toInt(raw);
+      return SerDeUtils.fromBytes(raw, Integer.class);
     }
 
     throw new IllegalStateException("No results found");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 737de08..e14cbd8 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -73,6 +73,7 @@
             <version>${global_storm_version}</version>
             <scope>provided</scope>
             <exclusions>
+
                 <exclusion>
                     <artifactId>servlet-api</artifactId>
                     <groupId>javax.servlet</groupId>
@@ -230,6 +231,11 @@
             <version>0.9.10</version>
         </dependency>
         <dependency>
+            <groupId>de.javakaffee</groupId>
+            <artifactId>kryo-serializers</artifactId>
+            <version>0.38</version>
+        </dependency>
+        <dependency>
             <groupId>com.tdunning</groupId>
             <artifactId>t-digest</artifactId>
             <version>3.1</version>
@@ -318,6 +324,14 @@
                                     <pattern>org.apache.commons.beanutils</pattern>
                                     <shadedPattern>org.apache.metron.beanutils</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>com.esotericsoftware</pattern>
+                                    <shadedPattern>org.apache.metron.esotericsoftware</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>de.javakaffee</pattern>
+                                    <shadedPattern>org.apache.metron.javakaffee</shadedPattern>
+                                </relocation>
                             </relocations>
                             <transformers>
                                 <transformer

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
index b71391c..8f1caf2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
@@ -21,7 +21,7 @@ import org.apache.metron.common.dsl.BaseStellarFunction;
 import org.apache.metron.common.dsl.Stellar;
 import org.apache.metron.common.utils.BloomFilter;
 import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.SerializationUtils;
+import org.apache.metron.common.utils.SerDeUtils;
 
 import java.util.Collection;
 import java.util.List;
@@ -98,7 +98,7 @@ public class DataStructureFunctions {
       if(args.size() > 2) {
         falsePositiveRate= ConversionUtils.convert(args.get(1), Float.class);
       }
-      return new BloomFilter<>(SerializationUtils.INSTANCE, expectedInsertions, falsePositiveRate);
+      return new BloomFilter<>(SerDeUtils.SERIALIZER, expectedInsertions, falsePositiveRate);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
index 82172f5..ec2ecfd 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
@@ -45,7 +45,15 @@ public class BloomFilter<T> implements Serializable {
       return super.hashCode() * 31;
     }
   }
+
+  public static class DefaultSerializer<T> implements Function<T, byte[]> {
+    @Override
+    public byte[] apply(T t) {
+      return SerDeUtils.toBytes(t);
+    }
+  }
   private com.google.common.hash.BloomFilter<T> filter;
+
   public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, double falsePositiveRate) {
     filter = com.google.common.hash.BloomFilter.create(new BloomFunnel<T>(serializer), expectedInsertions, falsePositiveRate);
   }
@@ -60,4 +68,19 @@ public class BloomFilter<T> implements Serializable {
     filter.putAll(filter2.filter);
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    BloomFilter<?> that = (BloomFilter<?>) o;
+
+    return filter != null ? filter.equals(that.filter) : that.filter == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    return filter != null ? filter.hashCode() : 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
new file mode 100644
index 0000000..b9ea816
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
@@ -0,0 +1,256 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.common.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.Util;
+import com.esotericsoftware.reflectasm.ConstructorAccess;
+import de.javakaffee.kryoserializers.*;
+import de.javakaffee.kryoserializers.cglib.CGLibProxySerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.storm.shade.org.joda.time.DateTime;
+import org.objenesis.instantiator.ObjectInstantiator;
+import org.objenesis.strategy.InstantiatorStrategy;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Modifier;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.function.Function;
+
+import static com.esotericsoftware.kryo.util.Util.className;
+
+/**
+ * Provides basic functionality to serialize and deserialize the allowed
+ * value types for a ProfileMeasurement.
+ */
+public class SerDeUtils {
+  protected static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class);
+  private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+    @Override
+    protected Kryo initialValue() {
+      Kryo ret = new Kryo();
+      ret.setReferences(true);
+      ret.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+      ret.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
+      ret.register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
+      ret.register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
+      ret.register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
+      ret.register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
+      ret.register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
+      ret.register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
+      ret.register(GregorianCalendar.class, new GregorianCalendarSerializer());
+      ret.register(InvocationHandler.class, new JdkProxySerializer());
+      UnmodifiableCollectionsSerializer.registerSerializers(ret);
+      SynchronizedCollectionsSerializer.registerSerializers(ret);
+
+// custom serializers for non-jdk libs
+
+// register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below)
+      ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new CGLibProxySerializer());
+// joda DateTime, LocalDate and LocalDateTime
+      ret.register(DateTime.class, new JodaDateTimeSerializer());
+      ret.register(LocalDate.class, new JodaLocalDateSerializer());
+      ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer());
+// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet
+      ImmutableListSerializer.registerSerializers(ret);
+      ImmutableSetSerializer.registerSerializers(ret);
+      ImmutableMapSerializer.registerSerializers(ret);
+      ImmutableMultimapSerializer.registerSerializers(ret);
+      return ret;
+    }
+  };
+
+  /**
+   * This was backported from a more recent version of kryo than we currently run.  The reason why it exists is
+   * that we want a strategy for instantiation of classes which attempts a no-arg constructor first and THEN falls
+   * back to reflection for performance reasons alone (this is, after all, in the critical path).
+   *
+   */
+  static private class DefaultInstantiatorStrategy implements org.objenesis.strategy.InstantiatorStrategy {
+    private InstantiatorStrategy fallbackStrategy;
+
+    public DefaultInstantiatorStrategy () {
+    }
+
+    public DefaultInstantiatorStrategy (InstantiatorStrategy fallbackStrategy) {
+      this.fallbackStrategy = fallbackStrategy;
+    }
+
+    public void setFallbackInstantiatorStrategy (final InstantiatorStrategy fallbackStrategy) {
+      this.fallbackStrategy = fallbackStrategy;
+    }
+
+    public InstantiatorStrategy getFallbackInstantiatorStrategy () {
+      return fallbackStrategy;
+    }
+
+    public ObjectInstantiator newInstantiatorOf (final Class type) {
+      if (!Util.isAndroid) {
+        // Use ReflectASM if the class is not a non-static member class.
+        Class enclosingType = type.getEnclosingClass();
+        boolean isNonStaticMemberClass = enclosingType != null && type.isMemberClass()
+                && !Modifier.isStatic(type.getModifiers());
+        if (!isNonStaticMemberClass) {
+          try {
+            final ConstructorAccess access = ConstructorAccess.get(type);
+            return new ObjectInstantiator() {
+              public Object newInstance () {
+                try {
+                  return access.newInstance();
+                } catch (Exception ex) {
+                  throw new KryoException("Error constructing instance of class: " + className(type), ex);
+                }
+              }
+            };
+          } catch (Exception ignored) {
+          }
+        }
+      }
+      // Reflection.
+      try {
+        Constructor ctor;
+        try {
+          ctor = type.getConstructor((Class[])null);
+        } catch (Exception ex) {
+          ctor = type.getDeclaredConstructor((Class[])null);
+          ctor.setAccessible(true);
+        }
+        final Constructor constructor = ctor;
+        return new ObjectInstantiator() {
+          public Object newInstance () {
+            try {
+              return constructor.newInstance();
+            } catch (Exception ex) {
+              throw new KryoException("Error constructing instance of class: " + className(type), ex);
+            }
+          }
+        };
+      } catch (Exception ignored) {
+      }
+      if (fallbackStrategy == null) {
+        if (type.isMemberClass() && !Modifier.isStatic(type.getModifiers()))
+          throw new KryoException("Class cannot be created (non-static member class): " + className(type));
+        else
+          throw new KryoException("Class cannot be created (missing no-arg constructor): " + className(type));
+      }
+      // InstantiatorStrategy.
+      return fallbackStrategy.newInstantiatorOf(type);
+    }
+  }
+
+  public static Serializer SERIALIZER = new Serializer();
+
+  private static class Serializer implements Function<Object, byte[]> {
+    /**
+     * Serializes the given Object into bytes.
+     *
+     */
+    @Override
+    public byte[] apply(Object o) {
+      return toBytes(o);
+    }
+  }
+
+  public static class Deserializer<T> implements Function<byte[], T> {
+
+    private Class<T> clazz;
+    public Deserializer(Class<T> clazz) {
+      this.clazz = clazz;
+    }
+    /**
+     * Deserializes the given bytes.
+     *
+     * @param bytes the function argument
+     * @return the function result
+     */
+    @Override
+    public T apply(byte[] bytes) {
+      return fromBytes(bytes, clazz);
+    }
+  }
+
+
+  private SerDeUtils() {
+    // do not instantiate
+  }
+
+  /**
+   * Serialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  The data
+   * type depends on how the profile is defined by the user.  The user should be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to serialize.
+   */
+  public static byte[] toBytes(Object value) {
+    try {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      Output output = new Output(bos);
+      kryo.get().writeClassAndObject(output, value);
+      output.flush();
+      bos.flush();
+      return bos.toByteArray();
+    }
+    catch(Throwable t) {
+      LOG.error("Unable to serialize: " + value + " because " + t.getMessage(), t);
+      throw new IllegalStateException("Unable to serialize " + value + " because " + t.getMessage(), t);
+    }
+  }
+
+  /**
+   * Deserialize a profile measurement's value.
+   *
+   * The value produced by a Profile definition can be any numeric data type.  The data
+   * type depends on how the profile is defined by the user.  The user should be able to
+   * choose the data type that is most suitable for their use case.
+   *
+   * @param value The value to deserialize.
+   */
+  public static <T> T fromBytes(byte[] value, Class<T> clazz) {
+    try {
+      Input input = new Input(new ByteArrayInputStream(value));
+      return clazz.cast(kryo.get().readClassAndObject(input));
+    }
+    catch(Throwable t) {
+      LOG.error("Unable to deserialize  because " + t.getMessage(), t);
+      throw t;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
deleted file mode 100644
index 96916af..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.common.utils;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-import java.util.function.Function;
-
-public enum SerializationUtils implements Function<Object, byte[]> {
-  INSTANCE;
-  protected static final Logger LOG = LoggerFactory.getLogger(SerializationUtils.class);
-  ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
-
-    @Override
-    protected Kryo initialValue() {
-      return new Kryo();
-    }
-  };
-
-  /**
-   * Applies this function to the given argument.
-   *
-   * @param t the function argument
-   * @return the function result
-   */
-  @Override
-  public byte[] apply(Object t) {
-    try {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Output output = new Output(bos);
-      kryo.get().writeObject(output, t);
-      output.flush();
-      byte[] ret = bos.toByteArray();
-      return ret;
-    }
-    catch(Throwable ex) {
-      LOG.error("Unable to serialize " + t + ": " + ex.getMessage(), ex);
-      throw ex;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
index 016d2c9..8cfa8a3 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
@@ -21,7 +21,6 @@
 package org.apache.metron.common.stellar;
 
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
 import org.apache.commons.math3.random.GaussianRandomGenerator;
 import org.apache.commons.math3.random.MersenneTwister;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
@@ -31,12 +30,15 @@ import org.apache.metron.common.dsl.ParseException;
 import org.apache.metron.common.dsl.StellarFunctions;
 import org.apache.metron.common.math.stats.OnlineStatisticsProviderTest;
 import org.apache.metron.common.math.stats.StatisticsProvider;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.*;
+import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -65,6 +67,37 @@ public class StellarStatisticsFunctionsTest {
     return Arrays.asList(new Object[][] {{ 0 }, { 100 }});
   }
 
+  private static void tolerantAssertEquals( Function<StatisticsProvider, Number> func
+                                          , StatisticsProvider left
+                                          , StatisticsProvider right
+                                          )
+
+  {
+    tolerantAssertEquals(func, left, right, null);
+  }
+
+  private static void tolerantAssertEquals( Function<StatisticsProvider, Number> func
+                                          , StatisticsProvider left
+                                          , StatisticsProvider right
+                                          , Double epsilon
+                                          )
+  {
+    try {
+      Number leftVal = func.apply(left);
+      Number rightVal = func.apply(left);
+      if(epsilon != null) {
+        Assert.assertEquals((double)leftVal, (double)rightVal, epsilon);
+      }
+      else {
+        Assert.assertEquals(leftVal, rightVal);
+      }
+    }
+    catch(UnsupportedOperationException uoe) {
+      //ignore
+    }
+
+  }
+
   /**
    * Runs a Stellar expression.
    * @param expr The expression to run.
@@ -72,7 +105,44 @@ public class StellarStatisticsFunctionsTest {
    */
   private static Object run(String expr, Map<String, Object> variables) {
     StellarProcessor processor = new StellarProcessor();
-    return processor.parse(expr, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
+    Object ret = processor.parse(expr, x-> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
+    byte[] raw = SerDeUtils.toBytes(ret);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    if(ret instanceof StatisticsProvider) {
+      StatisticsProvider left = (StatisticsProvider)ret;
+      StatisticsProvider right = (StatisticsProvider)actual;
+      //N
+      tolerantAssertEquals(prov -> prov.getCount(), left, right);
+      //sum
+      tolerantAssertEquals(prov -> prov.getSum(), left, right, 1e-3);
+      //sum of squares
+      tolerantAssertEquals(prov -> prov.getSumSquares(), left, right, 1e-3);
+      //sum of squares
+      tolerantAssertEquals(prov -> prov.getSumLogs(), left, right, 1e-3);
+      //Mean
+      tolerantAssertEquals(prov -> prov.getMean(), left, right, 1e-3);
+      //Quadratic Mean
+      tolerantAssertEquals(prov -> prov.getQuadraticMean(), left, right, 1e-3);
+      //SD
+      tolerantAssertEquals(prov -> prov.getStandardDeviation(), left, right, 1e-3);
+      //Variance
+      tolerantAssertEquals(prov -> prov.getVariance(), left, right, 1e-3);
+      //Min
+      tolerantAssertEquals(prov -> prov.getMin(), left, right, 1e-3);
+      //Max
+      tolerantAssertEquals(prov -> prov.getMax(), left, right, 1e-3);
+      //Kurtosis
+      tolerantAssertEquals(prov -> prov.getKurtosis(), left, right, 1e-3);
+      //Skewness
+      tolerantAssertEquals(prov -> prov.getSkewness(), left, right, 1e-3);
+      for (double d = 10.0; d < 100.0; d += 10) {
+        final double pctile = d;
+        //This is a sketch, so we're a bit more forgiving here in our choice of \epsilon.
+        tolerantAssertEquals(prov -> prov.getPercentile(pctile), left, right, 1e-2);
+
+      }
+    }
+    return ret;
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
index 4afa0e8..4f01026 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.dsl.*;
+import org.apache.metron.common.utils.SerDeUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.reflections.Reflections;
@@ -394,10 +395,25 @@ public class StellarTest {
   public static Object run(String rule, Map<String, Object> variables) {
     return run(rule, variables, Context.EMPTY_CONTEXT());
   }
+
+  /**
+   * This ensures the basic contract of a stellar expression is adhered to:
+   * 1. Validate works on the expression
+   * 2. The output can be serialized and deserialized properly
+   *
+   * @param rule
+   * @param variables
+   * @param context
+   * @return
+   */
   public static Object run(String rule, Map<String, Object> variables, Context context) {
     StellarProcessor processor = new StellarProcessor();
     Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
-    return processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+    Object ret = processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+    byte[] raw = SerDeUtils.toBytes(ret);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    Assert.assertEquals(ret, actual);
+    return ret;
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java
new file mode 100644
index 0000000..88ce664
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.common.utils;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the Serializer.
+ */
+public class SerDeUtilsTest {
+
+  @Test
+  public void testInteger() {
+    final int expected = 2;
+    byte[] raw = SerDeUtils.toBytes(expected);
+    int actual = SerDeUtils.fromBytes(raw, Integer.class);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDouble() {
+    final double expected = 2.0;
+    byte[] raw = SerDeUtils.toBytes(expected);
+    {
+      double actual = SerDeUtils.fromBytes(raw, Double.class);
+      assertEquals(expected, actual, 0.01);
+    }
+    {
+      double actual = (double) SerDeUtils.fromBytes(raw, Object.class);
+      assertEquals(expected, actual, 0.01);
+    }
+  }
+
+  @Test
+  public void testShort() {
+    final short expected = 2;
+    byte[] raw = SerDeUtils.toBytes(expected);
+    {
+      short actual = SerDeUtils.fromBytes(raw, Short.class);
+      assertEquals(expected, actual);
+    }
+    {
+      short actual = (short) SerDeUtils.fromBytes(raw, Object.class);
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void testLong() {
+    final long expected = 2L;
+    byte[] raw = SerDeUtils.toBytes(expected);
+    {
+      long actual = SerDeUtils.fromBytes(raw, Long.class);
+      assertEquals(expected, actual);
+    }
+    {
+      long actual = (Long) SerDeUtils.fromBytes(raw, Object.class);
+      assertEquals(expected, actual);
+    }
+  }
+
+  @Test
+  public void testFloat() {
+    final Float expected = 2.2F;
+    byte[] raw = SerDeUtils.toBytes(expected);
+    {
+      float actual = SerDeUtils.fromBytes(raw, Float.class);
+      assertEquals(expected, actual, 0.01);
+    }
+    {
+      float actual = (float) SerDeUtils.fromBytes(raw, Object.class);
+      assertEquals(expected, actual, 0.01);
+    }
+  }
+
+  @Test
+  public void testMap() {
+    final Map<String, Object> expected = new HashMap<>();
+    expected.put("foo", "bar");
+    expected.put( "bar", 1.0);
+    ;
+    byte[] raw = SerDeUtils.toBytes(expected);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testList() {
+    final List<String> expected = new ArrayList<String>();
+    expected.add("foo");
+    expected.add("bar");
+    byte[] raw = SerDeUtils.toBytes(expected);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testBloomFilter() {
+    final BloomFilter<Object> expected = new BloomFilter<>(new BloomFilter.DefaultSerializer<>(), 10000, 0.01);
+    expected.add("foo");
+    expected.add("bar");
+    byte[] raw = SerDeUtils.toBytes(expected);
+    BloomFilter<Object> actual = (BloomFilter) SerDeUtils.fromBytes(raw, Object.class);
+    Assert.assertTrue(actual.mightContain("foo"));
+    Assert.assertFalse(actual.mightContain("timothy"));
+    assertEquals(expected, actual);
+  }
+
+  public static class ArbitraryPojo {
+    private List<String> list = new ArrayList<>();
+    private String string = "foo";
+    private Double d = 1.0;
+    private Map<String, String> map = new HashMap<>();
+    private List<String> immutableList = ImmutableList.of("foo");
+
+    public ArbitraryPojo() {
+      list.add("foo");
+      list.add("bar");
+      map.put("key1", "value1");
+      map.put("key2", "value2");
+
+    }
+
+    public List<String> getList() {
+      return list;
+    }
+
+    public void setList(List<String> list) {
+      this.list = list;
+    }
+
+    public String getString() {
+      return string;
+    }
+
+    public void setString(String string) {
+      this.string = string;
+    }
+
+    public Double getD() {
+      return d;
+    }
+
+    public void setD(Double d) {
+      this.d = d;
+    }
+
+    public Map<String, String> getMap() {
+      return map;
+    }
+
+    public void setMap(Map<String, String> map) {
+      this.map = map;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ArbitraryPojo that = (ArbitraryPojo) o;
+
+      if (getList() != null ? !getList().equals(that.getList()) : that.getList() != null) return false;
+      if (getString() != null ? !getString().equals(that.getString()) : that.getString() != null) return false;
+      if (getD() != null ? !getD().equals(that.getD()) : that.getD() != null) return false;
+      if (getMap() != null ? !getMap().equals(that.getMap()) : that.getMap() != null) return false;
+      return immutableList != null ? immutableList.equals(that.immutableList) : that.immutableList == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = getList() != null ? getList().hashCode() : 0;
+      result = 31 * result + (getString() != null ? getString().hashCode() : 0);
+      result = 31 * result + (getD() != null ? getD().hashCode() : 0);
+      result = 31 * result + (getMap() != null ? getMap().hashCode() : 0);
+      result = 31 * result + (immutableList != null ? immutableList.hashCode() : 0);
+      return result;
+    }
+  }
+
+  @Test
+  public void testArbitraryPojo() {
+    final ArbitraryPojo expected = new ArbitraryPojo();
+    byte[] raw = SerDeUtils.toBytes(expected);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    assertEquals(expected, actual);
+  }
+}