You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/15 15:41:47 UTC
incubator-metron git commit: METRON-415: Allow a Profile to Store Any
Type as its Value closes apache/incubator-metron#253
Repository: incubator-metron
Updated Branches:
refs/heads/master baf0d24a4 -> 728133b91
METRON-415: Allow a Profile to Store Any Type as its Value closes apache/incubator-metron#253
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/728133b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/728133b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/728133b9
Branch: refs/heads/master
Commit: 728133b9115a0d9774a97fdc43063d8aef9df300
Parents: baf0d24
Author: cstella <ce...@gmail.com>
Authored: Thu Sep 15 11:41:30 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Thu Sep 15 11:41:30 2016 -0400
----------------------------------------------------------------------
.../profiler/client/HBaseProfilerClient.java | 4 +-
.../profiler/client/stellar/GetProfile.java | 2 +-
.../metron/profiler/hbase/Serializer.java | 92 -------
.../profiler/hbase/ValueOnlyColumnBuilder.java | 3 +-
.../metron/profiler/hbase/SerializerTest.java | 71 -----
.../integration/ProfilerIntegrationTest.java | 5 +-
metron-platform/metron-common/pom.xml | 14 +
.../dsl/functions/DataStructureFunctions.java | 4 +-
.../apache/metron/common/utils/BloomFilter.java | 23 ++
.../apache/metron/common/utils/SerDeUtils.java | 256 +++++++++++++++++++
.../metron/common/utils/SerializationUtils.java | 63 -----
.../stellar/StellarStatisticsFunctionsTest.java | 74 +++++-
.../metron/common/stellar/StellarTest.java | 18 +-
.../metron/common/utils/SerDeUtilsTest.java | 216 ++++++++++++++++
14 files changed, 608 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index 97691d4..cef2ea4 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.Serializer;
+import org.apache.metron.common.utils.SerDeUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -108,7 +108,7 @@ public class HBaseProfilerClient implements ProfilerClient {
Arrays.stream(results)
.filter(r -> r.containsColumn(columnFamily, columnQualifier))
.map(r -> r.getValue(columnFamily, columnQualifier))
- .forEach(val -> values.add(Serializer.fromBytes(val, clazz)));
+ .forEach(val -> values.add(SerDeUtils.fromBytes(val, clazz)));
} catch(IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index ffe9470..d96419f 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -142,7 +142,7 @@ public class GetProfile implements StellarFunction {
TimeUnit units = TimeUnit.valueOf(unitsName);
List<Object> groups = getGroupsArg(4, args);
- return client.fetch(profile, entity, durationAgo, units, Integer.class, groups);
+ return client.fetch(profile, entity, durationAgo, units, Object.class, groups);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
deleted file mode 100644
index c0fe16f..0000000
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/Serializer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.hbase;
-
-import org.apache.hadoop.hbase.util.Bytes;
-
-/**
- * Provides basic functionality to serialize and deserialize the allowed
- * value types for a ProfileMeasurement.
- */
-public class Serializer {
-
- private Serializer() {
- // do not instantiate
- }
-
- /**
- * Serialize a profile measurement's value.
- *
- * The value produced by a Profile definition can be any numeric data type. The data
- * type depends on how the profile is defined by the user. The user should be able to
- * choose the data type that is most suitable for their use case.
- *
- * @param value The value to serialize.
- */
- public static byte[] toBytes(Object value) {
- byte[] result;
-
- if(value instanceof Integer) {
- result = Bytes.toBytes((Integer) value);
- } else if(value instanceof Double) {
- result = Bytes.toBytes((Double) value);
- } else if(value instanceof Short) {
- result = Bytes.toBytes((Short) value);
- } else if(value instanceof Long) {
- result = Bytes.toBytes((Long) value);
- } else if(value instanceof Float) {
- result = Bytes.toBytes((Float) value);
- } else {
- throw new RuntimeException("Expected 'Number': actual=" + value);
- }
-
- return result;
- }
-
- /**
- * Deserialize a profile measurement's value.
- *
- * The value produced by a Profile definition can be any numeric data type. The data
- * type depends on how the profile is defined by the user. The user should be able to
- * choose the data type that is most suitable for their use case.
- *
- * @param value The value to deserialize.
- */
- public static <T> T fromBytes(byte[] value, Class<T> clazz) {
- T result;
-
- if(clazz == Integer.class) {
- result = clazz.cast(Bytes.toInt(value));
- } else if(clazz == Double.class) {
- result = clazz.cast(Bytes.toDouble(value));
- } else if(clazz == Short.class) {
- result = clazz.cast(Bytes.toShort(value));
- } else if(clazz == Long.class) {
- result = clazz.cast(Bytes.toLong(value));
- } else if(clazz == Float.class) {
- result = clazz.cast(Bytes.toFloat(value));
- } else {
- throw new RuntimeException("Expected 'Number': actual=" + clazz);
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index aeda317..cc6aa5a 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -21,6 +21,7 @@
package org.apache.metron.profiler.hbase;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.utils.SerDeUtils;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.storm.hbase.common.ColumnList;
@@ -48,7 +49,7 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
public ColumnList columns(ProfileMeasurement measurement) {
ColumnList cols = new ColumnList();
- cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), Serializer.toBytes(measurement.getValue()));
+ cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getValue()));
return cols;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
deleted file mode 100644
index 69de4ba..0000000
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/hbase/SerializerTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.hbase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the Serializer.
- */
-public class SerializerTest {
-
- @Test
- public void testInteger() {
- final int expected = 2;
- byte[] raw = Serializer.toBytes(expected);
- int actual = Serializer.fromBytes(raw, Integer.class);
- assertEquals(expected, actual);
- }
-
- @Test
- public void testDouble() {
- final double expected = 2.0;
- byte[] raw = Serializer.toBytes(expected);
- double actual = Serializer.fromBytes(raw, Double.class);
- assertEquals(expected, actual, 0.01);
- }
-
- @Test
- public void testShort() {
- final short expected = 2;
- byte[] raw = Serializer.toBytes(expected);
- short actual = Serializer.fromBytes(raw, Short.class);
- assertEquals(expected, actual);
- }
-
- @Test
- public void testLong() {
- final long expected = 2L;
- byte[] raw = Serializer.toBytes(expected);
- long actual = Serializer.fromBytes(raw, Long.class);
- assertEquals(expected, actual);
- }
-
- @Test
- public void testFloat() {
- final Float expected = 2.2F;
- byte[] raw = Serializer.toBytes(expected);
- float actual = Serializer.fromBytes(raw, Float.class);
- assertEquals(expected, actual, 0.01);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index 91191c8..80b4399 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.Constants;
import org.apache.metron.common.spout.kafka.SpoutConfig;
+import org.apache.metron.common.utils.SerDeUtils;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
@@ -227,7 +228,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
for (Result result : scanner) {
byte[] raw = result.getValue(cf, columnQual);
- return Bytes.toDouble(raw);
+ return SerDeUtils.fromBytes(raw, Double.class);
}
throw new IllegalStateException("No results found");
@@ -243,7 +244,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
for (Result result : scanner) {
byte[] raw = result.getValue(cf, columnQual);
- return Bytes.toInt(raw);
+ return SerDeUtils.fromBytes(raw, Integer.class);
}
throw new IllegalStateException("No results found");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 737de08..e14cbd8 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -73,6 +73,7 @@
<version>${global_storm_version}</version>
<scope>provided</scope>
<exclusions>
+
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
@@ -230,6 +231,11 @@
<version>0.9.10</version>
</dependency>
<dependency>
+ <groupId>de.javakaffee</groupId>
+ <artifactId>kryo-serializers</artifactId>
+ <version>0.38</version>
+ </dependency>
+ <dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
<version>3.1</version>
@@ -318,6 +324,14 @@
<pattern>org.apache.commons.beanutils</pattern>
<shadedPattern>org.apache.metron.beanutils</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.esotericsoftware</pattern>
+ <shadedPattern>org.apache.metron.esotericsoftware</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>de.javakaffee</pattern>
+ <shadedPattern>org.apache.metron.javakaffee</shadedPattern>
+ </relocation>
</relocations>
<transformers>
<transformer
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
index b71391c..8f1caf2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/functions/DataStructureFunctions.java
@@ -21,7 +21,7 @@ import org.apache.metron.common.dsl.BaseStellarFunction;
import org.apache.metron.common.dsl.Stellar;
import org.apache.metron.common.utils.BloomFilter;
import org.apache.metron.common.utils.ConversionUtils;
-import org.apache.metron.common.utils.SerializationUtils;
+import org.apache.metron.common.utils.SerDeUtils;
import java.util.Collection;
import java.util.List;
@@ -98,7 +98,7 @@ public class DataStructureFunctions {
if(args.size() > 2) {
falsePositiveRate= ConversionUtils.convert(args.get(1), Float.class);
}
- return new BloomFilter<>(SerializationUtils.INSTANCE, expectedInsertions, falsePositiveRate);
+ return new BloomFilter<>(SerDeUtils.SERIALIZER, expectedInsertions, falsePositiveRate);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
index 82172f5..ec2ecfd 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/BloomFilter.java
@@ -45,7 +45,15 @@ public class BloomFilter<T> implements Serializable {
return super.hashCode() * 31;
}
}
+
+ public static class DefaultSerializer<T> implements Function<T, byte[]> {
+ @Override
+ public byte[] apply(T t) {
+ return SerDeUtils.toBytes(t);
+ }
+ }
private com.google.common.hash.BloomFilter<T> filter;
+
public BloomFilter(Function<T, byte[]> serializer, int expectedInsertions, double falsePositiveRate) {
filter = com.google.common.hash.BloomFilter.create(new BloomFunnel<T>(serializer), expectedInsertions, falsePositiveRate);
}
@@ -60,4 +68,19 @@ public class BloomFilter<T> implements Serializable {
filter.putAll(filter2.filter);
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ BloomFilter<?> that = (BloomFilter<?>) o;
+
+ return filter != null ? filter.equals(that.filter) : that.filter == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ return filter != null ? filter.hashCode() : 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
new file mode 100644
index 0000000..b9ea816
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerDeUtils.java
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.common.utils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.Util;
+import com.esotericsoftware.reflectasm.ConstructorAccess;
+import de.javakaffee.kryoserializers.*;
+import de.javakaffee.kryoserializers.cglib.CGLibProxySerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer;
+import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.storm.shade.org.joda.time.DateTime;
+import org.objenesis.instantiator.ObjectInstantiator;
+import org.objenesis.strategy.InstantiatorStrategy;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Modifier;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.*;
+import java.util.function.Function;
+
+import static com.esotericsoftware.kryo.util.Util.className;
+
+/**
+ * Provides basic functionality to serialize and deserialize the allowed
+ * value types for a ProfileMeasurement.
+ */
+public class SerDeUtils {
+ protected static final Logger LOG = LoggerFactory.getLogger(SerDeUtils.class);
+ private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo ret = new Kryo();
+ ret.setReferences(true);
+ ret.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+ ret.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
+ ret.register(Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer());
+ ret.register(Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer());
+ ret.register(Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer());
+ ret.register(Collections.singletonList("").getClass(), new CollectionsSingletonListSerializer());
+ ret.register(Collections.singleton("").getClass(), new CollectionsSingletonSetSerializer());
+ ret.register(Collections.singletonMap("", "").getClass(), new CollectionsSingletonMapSerializer());
+ ret.register(GregorianCalendar.class, new GregorianCalendarSerializer());
+ ret.register(InvocationHandler.class, new JdkProxySerializer());
+ UnmodifiableCollectionsSerializer.registerSerializers(ret);
+ SynchronizedCollectionsSerializer.registerSerializers(ret);
+
+// custom serializers for non-jdk libs
+
+// register CGLibProxySerializer, works in combination with the appropriate action in handleUnregisteredClass (see below)
+ ret.register(CGLibProxySerializer.CGLibProxyMarker.class, new CGLibProxySerializer());
+// joda DateTime, LocalDate and LocalDateTime
+ ret.register(DateTime.class, new JodaDateTimeSerializer());
+ ret.register(LocalDate.class, new JodaLocalDateSerializer());
+ ret.register(LocalDateTime.class, new JodaLocalDateTimeSerializer());
+// guava ImmutableList, ImmutableSet, ImmutableMap, ImmutableMultimap, UnmodifiableNavigableSet
+ ImmutableListSerializer.registerSerializers(ret);
+ ImmutableSetSerializer.registerSerializers(ret);
+ ImmutableMapSerializer.registerSerializers(ret);
+ ImmutableMultimapSerializer.registerSerializers(ret);
+ return ret;
+ }
+ };
+
+ /**
+ * This was backported from a more recent version of kryo than we currently run. The reason why it exists is
+ * that we want a strategy for instantiation of classes which attempts a no-arg constructor first and THEN falls
+ * back to reflection for performance reasons alone (this is, after all, in the critical path).
+ *
+ */
+ static private class DefaultInstantiatorStrategy implements org.objenesis.strategy.InstantiatorStrategy {
+ private InstantiatorStrategy fallbackStrategy;
+
+ public DefaultInstantiatorStrategy () {
+ }
+
+ public DefaultInstantiatorStrategy (InstantiatorStrategy fallbackStrategy) {
+ this.fallbackStrategy = fallbackStrategy;
+ }
+
+ public void setFallbackInstantiatorStrategy (final InstantiatorStrategy fallbackStrategy) {
+ this.fallbackStrategy = fallbackStrategy;
+ }
+
+ public InstantiatorStrategy getFallbackInstantiatorStrategy () {
+ return fallbackStrategy;
+ }
+
+ public ObjectInstantiator newInstantiatorOf (final Class type) {
+ if (!Util.isAndroid) {
+ // Use ReflectASM if the class is not a non-static member class.
+ Class enclosingType = type.getEnclosingClass();
+ boolean isNonStaticMemberClass = enclosingType != null && type.isMemberClass()
+ && !Modifier.isStatic(type.getModifiers());
+ if (!isNonStaticMemberClass) {
+ try {
+ final ConstructorAccess access = ConstructorAccess.get(type);
+ return new ObjectInstantiator() {
+ public Object newInstance () {
+ try {
+ return access.newInstance();
+ } catch (Exception ex) {
+ throw new KryoException("Error constructing instance of class: " + className(type), ex);
+ }
+ }
+ };
+ } catch (Exception ignored) {
+ }
+ }
+ }
+ // Reflection.
+ try {
+ Constructor ctor;
+ try {
+ ctor = type.getConstructor((Class[])null);
+ } catch (Exception ex) {
+ ctor = type.getDeclaredConstructor((Class[])null);
+ ctor.setAccessible(true);
+ }
+ final Constructor constructor = ctor;
+ return new ObjectInstantiator() {
+ public Object newInstance () {
+ try {
+ return constructor.newInstance();
+ } catch (Exception ex) {
+ throw new KryoException("Error constructing instance of class: " + className(type), ex);
+ }
+ }
+ };
+ } catch (Exception ignored) {
+ }
+ if (fallbackStrategy == null) {
+ if (type.isMemberClass() && !Modifier.isStatic(type.getModifiers()))
+ throw new KryoException("Class cannot be created (non-static member class): " + className(type));
+ else
+ throw new KryoException("Class cannot be created (missing no-arg constructor): " + className(type));
+ }
+ // InstantiatorStrategy.
+ return fallbackStrategy.newInstantiatorOf(type);
+ }
+ }
+
+ public static Serializer SERIALIZER = new Serializer();
+
+ private static class Serializer implements Function<Object, byte[]> {
+ /**
+ * Serializes the given Object into bytes.
+ *
+ */
+ @Override
+ public byte[] apply(Object o) {
+ return toBytes(o);
+ }
+ }
+
+ public static class Deserializer<T> implements Function<byte[], T> {
+
+ private Class<T> clazz;
+ public Deserializer(Class<T> clazz) {
+ this.clazz = clazz;
+ }
+ /**
+ * Deserializes the given bytes.
+ *
+ * @param bytes the function argument
+ * @return the function result
+ */
+ @Override
+ public T apply(byte[] bytes) {
+ return fromBytes(bytes, clazz);
+ }
+ }
+
+
+ private SerDeUtils() {
+ // do not instantiate
+ }
+
+ /**
+ * Serialize a profile measurement's value.
+ *
+ * The value produced by a Profile definition can be any numeric data type. The data
+ * type depends on how the profile is defined by the user. The user should be able to
+ * choose the data type that is most suitable for their use case.
+ *
+ * @param value The value to serialize.
+ */
+ public static byte[] toBytes(Object value) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Output output = new Output(bos);
+ kryo.get().writeClassAndObject(output, value);
+ output.flush();
+ bos.flush();
+ return bos.toByteArray();
+ }
+ catch(Throwable t) {
+ LOG.error("Unable to serialize: " + value + " because " + t.getMessage(), t);
+ throw new IllegalStateException("Unable to serialize " + value + " because " + t.getMessage(), t);
+ }
+ }
+
+ /**
+ * Deserialize a profile measurement's value.
+ *
+ * The value produced by a Profile definition can be any numeric data type. The data
+ * type depends on how the profile is defined by the user. The user should be able to
+ * choose the data type that is most suitable for their use case.
+ *
+ * @param value The value to deserialize.
+ */
+ public static <T> T fromBytes(byte[] value, Class<T> clazz) {
+ try {
+ Input input = new Input(new ByteArrayInputStream(value));
+ return clazz.cast(kryo.get().readClassAndObject(input));
+ }
+ catch(Throwable t) {
+ LOG.error("Unable to deserialize because " + t.getMessage(), t);
+ throw t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
deleted file mode 100644
index 96916af..0000000
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/SerializationUtils.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.metron.common.utils;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-import java.util.function.Function;
-
-public enum SerializationUtils implements Function<Object, byte[]> {
- INSTANCE;
- protected static final Logger LOG = LoggerFactory.getLogger(SerializationUtils.class);
- ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
-
- @Override
- protected Kryo initialValue() {
- return new Kryo();
- }
- };
-
- /**
- * Applies this function to the given argument.
- *
- * @param t the function argument
- * @return the function result
- */
- @Override
- public byte[] apply(Object t) {
- try {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.get().writeObject(output, t);
- output.flush();
- byte[] ret = bos.toByteArray();
- return ret;
- }
- catch(Throwable ex) {
- LOG.error("Unable to serialize " + t + ": " + ex.getMessage(), ex);
- throw ex;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
index 016d2c9..8cfa8a3 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarStatisticsFunctionsTest.java
@@ -21,7 +21,6 @@
package org.apache.metron.common.stellar;
import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
import org.apache.commons.math3.random.GaussianRandomGenerator;
import org.apache.commons.math3.random.MersenneTwister;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
@@ -31,12 +30,15 @@ import org.apache.metron.common.dsl.ParseException;
import org.apache.metron.common.dsl.StellarFunctions;
import org.apache.metron.common.math.stats.OnlineStatisticsProviderTest;
import org.apache.metron.common.math.stats.StatisticsProvider;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.*;
+import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -65,6 +67,37 @@ public class StellarStatisticsFunctionsTest {
return Arrays.asList(new Object[][] {{ 0 }, { 100 }});
}
+ private static void tolerantAssertEquals( Function<StatisticsProvider, Number> func
+ , StatisticsProvider left
+ , StatisticsProvider right
+ )
+
+ {
+ tolerantAssertEquals(func, left, right, null);
+ }
+
+ private static void tolerantAssertEquals( Function<StatisticsProvider, Number> func
+ , StatisticsProvider left
+ , StatisticsProvider right
+ , Double epsilon
+ )
+ {
+ try {
+ Number leftVal = func.apply(left);
+ Number rightVal = func.apply(left);
+ if(epsilon != null) {
+ Assert.assertEquals((double)leftVal, (double)rightVal, epsilon);
+ }
+ else {
+ Assert.assertEquals(leftVal, rightVal);
+ }
+ }
+ catch(UnsupportedOperationException uoe) {
+ //ignore
+ }
+
+ }
+
/**
* Runs a Stellar expression.
* @param expr The expression to run.
@@ -72,7 +105,44 @@ public class StellarStatisticsFunctionsTest {
*/
private static Object run(String expr, Map<String, Object> variables) {
StellarProcessor processor = new StellarProcessor();
- return processor.parse(expr, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
+ Object ret = processor.parse(expr, x-> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
+ byte[] raw = SerDeUtils.toBytes(ret);
+ Object actual = SerDeUtils.fromBytes(raw, Object.class);
+ if(ret instanceof StatisticsProvider) {
+ StatisticsProvider left = (StatisticsProvider)ret;
+ StatisticsProvider right = (StatisticsProvider)actual;
+ //N
+ tolerantAssertEquals(prov -> prov.getCount(), left, right);
+ //sum
+ tolerantAssertEquals(prov -> prov.getSum(), left, right, 1e-3);
+ //sum of squares
+ tolerantAssertEquals(prov -> prov.getSumSquares(), left, right, 1e-3);
+ //sum of squares
+ tolerantAssertEquals(prov -> prov.getSumLogs(), left, right, 1e-3);
+ //Mean
+ tolerantAssertEquals(prov -> prov.getMean(), left, right, 1e-3);
+ //Quadratic Mean
+ tolerantAssertEquals(prov -> prov.getQuadraticMean(), left, right, 1e-3);
+ //SD
+ tolerantAssertEquals(prov -> prov.getStandardDeviation(), left, right, 1e-3);
+ //Variance
+ tolerantAssertEquals(prov -> prov.getVariance(), left, right, 1e-3);
+ //Min
+ tolerantAssertEquals(prov -> prov.getMin(), left, right, 1e-3);
+ //Max
+ tolerantAssertEquals(prov -> prov.getMax(), left, right, 1e-3);
+ //Kurtosis
+ tolerantAssertEquals(prov -> prov.getKurtosis(), left, right, 1e-3);
+ //Skewness
+ tolerantAssertEquals(prov -> prov.getSkewness(), left, right, 1e-3);
+ for (double d = 10.0; d < 100.0; d += 10) {
+ final double pctile = d;
+ //This is a sketch, so we're a bit more forgiving here in our choice of \epsilon.
+ tolerantAssertEquals(prov -> prov.getPercentile(pctile), left, right, 1e-2);
+
+ }
+ }
+ return ret;
}
@Before
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
index 4afa0e8..4f01026 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/stellar/StellarTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.metron.common.dsl.*;
+import org.apache.metron.common.utils.SerDeUtils;
import org.junit.Assert;
import org.junit.Test;
import org.reflections.Reflections;
@@ -394,10 +395,25 @@ public class StellarTest {
public static Object run(String rule, Map<String, Object> variables) {
return run(rule, variables, Context.EMPTY_CONTEXT());
}
+
+ /**
+ * This ensures the basic contract of a stellar expression is adhered to:
+ * 1. Validate works on the expression
+ * 2. The output can be serialized and deserialized properly
+ *
+ * @param rule
+ * @param variables
+ * @param context
+ * @return
+ */
public static Object run(String rule, Map<String, Object> variables, Context context) {
StellarProcessor processor = new StellarProcessor();
Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
- return processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+ Object ret = processor.parse(rule, x -> variables.get(x), StellarFunctions.FUNCTION_RESOLVER(), context);
+ byte[] raw = SerDeUtils.toBytes(ret);
+ Object actual = SerDeUtils.fromBytes(raw, Object.class);
+ Assert.assertEquals(ret, actual);
+ return ret;
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/728133b9/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java
new file mode 100644
index 0000000..88ce664
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/SerDeUtilsTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.common.utils;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the Serializer.
+ */
+public class SerDeUtilsTest {
+
+ @Test
+ public void testInteger() {
+ final int expected = 2;
+ byte[] raw = SerDeUtils.toBytes(expected);
+ int actual = SerDeUtils.fromBytes(raw, Integer.class);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testDouble() {
+ final double expected = 2.0;
+ byte[] raw = SerDeUtils.toBytes(expected);
+ {
+ double actual = SerDeUtils.fromBytes(raw, Double.class);
+ assertEquals(expected, actual, 0.01);
+ }
+ {
+ double actual = (double) SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual, 0.01);
+ }
+ }
+
+ @Test
+ public void testShort() {
+ final short expected = 2;
+ byte[] raw = SerDeUtils.toBytes(expected);
+ {
+ short actual = SerDeUtils.fromBytes(raw, Short.class);
+ assertEquals(expected, actual);
+ }
+ {
+ short actual = (short) SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testLong() {
+ final long expected = 2L;
+ byte[] raw = SerDeUtils.toBytes(expected);
+ {
+ long actual = SerDeUtils.fromBytes(raw, Long.class);
+ assertEquals(expected, actual);
+ }
+ {
+ long actual = (Long) SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testFloat() {
+ final Float expected = 2.2F;
+ byte[] raw = SerDeUtils.toBytes(expected);
+ {
+ float actual = SerDeUtils.fromBytes(raw, Float.class);
+ assertEquals(expected, actual, 0.01);
+ }
+ {
+ float actual = (float) SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual, 0.01);
+ }
+ }
+
+ @Test
+ public void testMap() {
+ final Map<String, Object> expected = new HashMap<>();
+ expected.put("foo", "bar");
+ expected.put( "bar", 1.0);
+ ;
+ byte[] raw = SerDeUtils.toBytes(expected);
+ Object actual = SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testList() {
+ final List<String> expected = new ArrayList<String>();
+ expected.add("foo");
+ expected.add("bar");
+ byte[] raw = SerDeUtils.toBytes(expected);
+ Object actual = SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testBloomFilter() {
+ final BloomFilter<Object> expected = new BloomFilter<>(new BloomFilter.DefaultSerializer<>(), 10000, 0.01);
+ expected.add("foo");
+ expected.add("bar");
+ byte[] raw = SerDeUtils.toBytes(expected);
+ BloomFilter<Object> actual = (BloomFilter) SerDeUtils.fromBytes(raw, Object.class);
+ Assert.assertTrue(actual.mightContain("foo"));
+ Assert.assertFalse(actual.mightContain("timothy"));
+ assertEquals(expected, actual);
+ }
+
+ public static class ArbitraryPojo {
+ private List<String> list = new ArrayList<>();
+ private String string = "foo";
+ private Double d = 1.0;
+ private Map<String, String> map = new HashMap<>();
+ private List<String> immutableList = ImmutableList.of("foo");
+
+ public ArbitraryPojo() {
+ list.add("foo");
+ list.add("bar");
+ map.put("key1", "value1");
+ map.put("key2", "value2");
+
+ }
+
+ public List<String> getList() {
+ return list;
+ }
+
+ public void setList(List<String> list) {
+ this.list = list;
+ }
+
+ public String getString() {
+ return string;
+ }
+
+ public void setString(String string) {
+ this.string = string;
+ }
+
+ public Double getD() {
+ return d;
+ }
+
+ public void setD(Double d) {
+ this.d = d;
+ }
+
+ public Map<String, String> getMap() {
+ return map;
+ }
+
+ public void setMap(Map<String, String> map) {
+ this.map = map;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ArbitraryPojo that = (ArbitraryPojo) o;
+
+ if (getList() != null ? !getList().equals(that.getList()) : that.getList() != null) return false;
+ if (getString() != null ? !getString().equals(that.getString()) : that.getString() != null) return false;
+ if (getD() != null ? !getD().equals(that.getD()) : that.getD() != null) return false;
+ if (getMap() != null ? !getMap().equals(that.getMap()) : that.getMap() != null) return false;
+ return immutableList != null ? immutableList.equals(that.immutableList) : that.immutableList == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getList() != null ? getList().hashCode() : 0;
+ result = 31 * result + (getString() != null ? getString().hashCode() : 0);
+ result = 31 * result + (getD() != null ? getD().hashCode() : 0);
+ result = 31 * result + (getMap() != null ? getMap().hashCode() : 0);
+ result = 31 * result + (immutableList != null ? immutableList.hashCode() : 0);
+ return result;
+ }
+ }
+
+ @Test
+ public void testArbitraryPojo() {
+ final ArbitraryPojo expected = new ArbitraryPojo();
+ byte[] raw = SerDeUtils.toBytes(expected);
+ Object actual = SerDeUtils.fromBytes(raw, Object.class);
+ assertEquals(expected, actual);
+ }
+}