You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2019/11/25 16:31:16 UTC
[metron] branch master updated: METRON-2285 Batch Profiler Cannot
Persist Data Sketches (nickwallen via mmiklavc) closes apache/metron#1564
This is an automated email from the ASF dual-hosted git repository.
mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 101d427 METRON-2285 Batch Profiler Cannot Persist Data Sketches (nickwallen via mmiklavc) closes apache/metron#1564
101d427 is described below
commit 101d4275ce09b819b2312407e427469370f69ef8
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Mon Nov 25 09:30:59 2019 -0700
METRON-2285 Batch Profiler Cannot Persist Data Sketches (nickwallen via mmiklavc) closes apache/metron#1564
---
.../metron/profiler/spark/BatchProfiler.java | 5 +-
.../profiler/spark/ProfileMeasurementAdapter.java | 132 ---------------------
.../spark/function/HBaseWriterFunction.java | 10 +-
.../spark/function/ProfileBuilderFunction.java | 12 +-
.../spark/BatchProfilerIntegrationTest.java | 57 ++++++---
.../spark/function/HBaseWriterFunctionTest.java | 21 ++--
.../spark/function/ProfileBuilderFunctionTest.java | 10 +-
7 files changed, 69 insertions(+), 178 deletions(-)
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 43b42be..96e1880 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -22,6 +22,7 @@ package org.apache.metron.profiler.spark;
import com.google.common.collect.Maps;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.spark.function.GroupByPeriodFunction;
import org.apache.metron.profiler.spark.function.HBaseWriterFunction;
import org.apache.metron.profiler.spark.function.MessageRouterFunction;
@@ -91,9 +92,9 @@ public class BatchProfiler implements Serializable {
LOG.debug("Generated {} message route(s)", routes.cache().count());
// build the profiles
- Dataset<ProfileMeasurementAdapter> measurements = routes
+ Dataset<ProfileMeasurement> measurements = routes
.groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING())
- .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurementAdapter.class));
+ .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurement.class));
LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
// write the profile measurements to HBase
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
deleted file mode 100644
index 5da7d04..0000000
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.metron.profiler.spark;
-
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.ProfilePeriod;
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * An adapter for the {@link ProfileMeasurement} class so that the data
- * can be serialized as required by Spark.
- *
- * <p>The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object` well. This
- * adapter encodes the profile's result as byte[] rather than an Object to work around this.
- */
-public class ProfileMeasurementAdapter implements Serializable {
-
- /**
- * The name of the profile that this measurement is associated with.
- */
- private String profileName;
-
- /**
- * The name of the entity being profiled.
- */
- private String entity;
-
- /**
- * A monotonically increasing number identifying the period. The first period is 0
- * and began at the epoch.
- */
- private Long periodId;
-
- /**
- * The duration of each period in milliseconds.
- */
- private Long durationMillis;
-
- /**
- * The result of evaluating the profile expression.
- *
- * The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object`. This
- * adapter encodes the profile's result as `byte[]` rather than an `Object` to work around this.
- */
- private byte[] profileValue;
-
- public ProfileMeasurementAdapter() {
- // default constructor required for serialization in Spark
- }
-
- public ProfileMeasurementAdapter(ProfileMeasurement measurement) {
- this.profileName = measurement.getProfileName();
- this.entity = measurement.getEntity();
- this.periodId = measurement.getPeriod().getPeriod();
- this.durationMillis = measurement.getPeriod().getDurationMillis();
- this.profileValue = SerDeUtils.toBytes(measurement.getProfileValue());
- }
-
- public ProfileMeasurement toProfileMeasurement() {
- ProfilePeriod period = ProfilePeriod.fromPeriodId(periodId, durationMillis, TimeUnit.MILLISECONDS);
- ProfileMeasurement measurement = new ProfileMeasurement()
- .withProfileName(profileName)
- .withEntity(entity)
- .withPeriod(period)
- .withProfileValue(SerDeUtils.fromBytes(profileValue, Object.class));
- return measurement;
- }
-
- public String getProfileName() {
- return profileName;
- }
-
- public void setProfileName(String profileName) {
- this.profileName = profileName;
- }
-
- public String getEntity() {
- return entity;
- }
-
- public void setEntity(String entity) {
- this.entity = entity;
- }
-
- public Long getPeriodId() {
- return periodId;
- }
-
- public void setPeriodId(Long periodId) {
- this.periodId = periodId;
- }
-
- public Long getDurationMillis() {
- return durationMillis;
- }
-
- public void setDurationMillis(Long durationMillis) {
- this.durationMillis = durationMillis;
- }
-
- public byte[] getProfileValue() {
- return profileValue;
- }
-
- public void setProfileValue(byte[] profileValue) {
- this.profileValue = profileValue;
- }
-
- public void setProfileValue(Object profileValue) {
- this.profileValue = SerDeUtils.toBytes(profileValue);
- }
-}
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
index cfabd94..afb7241 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -32,7 +32,6 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +55,7 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATI
/**
* Writes the profile measurements to HBase in Spark.
*/
-public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurementAdapter, Integer> {
+public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurement, Integer> {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -108,20 +107,19 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
* @return The number of measurements written to HBase.
*/
@Override
- public Iterator<Integer> call(Iterator<ProfileMeasurementAdapter> iterator) throws Exception {
+ public Iterator<Integer> call(Iterator<ProfileMeasurement> iterator) throws Exception {
int count = 0;
LOG.debug("About to write profile measurement(s) to HBase");
// do not open hbase connection, if nothing to write
- List<ProfileMeasurementAdapter> measurements = IteratorUtils.toList(iterator);
+ List<ProfileMeasurement> measurements = IteratorUtils.toList(iterator);
if(measurements.size() > 0) {
// open an HBase connection
Configuration config = HBaseConfiguration.create();
try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) {
- for (ProfileMeasurementAdapter adapter : measurements) {
- ProfileMeasurement m = adapter.toProfileMeasurement();
+ for (ProfileMeasurement m : measurements) {
client.addMutation(rowKeyBuilder.rowKey(m), columnBuilder.columns(m), durability);
}
count = client.mutate();
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
index 7283b48..e052397 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
@@ -23,7 +23,6 @@ import org.apache.metron.profiler.DefaultMessageDistributor;
import org.apache.metron.profiler.MessageDistributor;
import org.apache.metron.profiler.MessageRoute;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
import org.apache.metron.stellar.dsl.Context;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.slf4j.Logger;
@@ -50,7 +49,7 @@ import static org.apache.metron.profiler.spark.function.GroupByPeriodFunction.pr
/**
* The function responsible for building profiles in Spark.
*/
-public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurementAdapter> {
+public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurement> {
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -74,7 +73,7 @@ public class ProfileBuilderFunction implements MapGroupsFunction<String, Message
* @return
*/
@Override
- public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator) {
+ public ProfileMeasurement call(String group, Iterator<MessageRoute> iterator) {
// create the distributor; some settings are unnecessary because it is cleaned-up immediately after processing the batch
int maxRoutes = Integer.MAX_VALUE;
long profileTTLMillis = Long.MAX_VALUE;
@@ -93,13 +92,12 @@ public class ProfileBuilderFunction implements MapGroupsFunction<String, Message
}
// flush the profile
- ProfileMeasurementAdapter result;
+ ProfileMeasurement result;
List<ProfileMeasurement> measurements = distributor.flush();
if(measurements.size() == 1) {
- ProfileMeasurement m = measurements.get(0);
- result = new ProfileMeasurementAdapter(m);
+ result = measurements.get(0);
LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}",
- m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue());
+ result.getProfileName(), result.getEntity(), result.getPeriod().getPeriod(), result.getProfileValue());
} else if(measurements.size() == 0) {
String msg = format("No profile measurement can be calculated. Review the profile for bugs. profile=%s, entity=%s, period=%s",
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 72fd283..1e9f32b 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -19,24 +19,23 @@
*/
package org.apache.metron.profiler.spark;
-import com.google.common.collect.Maps;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.profiler.MessageRoute;
import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
import org.apache.metron.profiler.client.stellar.WindowLookback;
-import org.apache.metron.profiler.spark.function.MessageRouterFunction;
+import org.apache.metron.statistics.StellarStatisticsFunctions;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.ConversionFunctions;
+import org.apache.metron.stellar.dsl.functions.DataStructureFunctions;
+import org.apache.metron.stellar.dsl.functions.StringFunctions;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
-import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
-import org.json.simple.JSONObject;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -46,9 +45,7 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -69,7 +66,6 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -130,7 +126,11 @@ public class BatchProfilerIntegrationTest {
new SimpleFunctionResolver()
.withClass(GetProfile.class)
.withClass(FixedLookback.class)
- .withClass(WindowLookback.class),
+ .withClass(WindowLookback.class)
+ .withClass(DataStructureFunctions.Length.class)
+ .withClass(StringFunctions.GetFirst.class)
+ .withClass(StellarStatisticsFunctions.Count.class)
+ .withClass(StellarStatisticsFunctions.Sum.class),
new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
.build());
@@ -353,14 +353,14 @@ public class BatchProfilerIntegrationTest {
* "foreach": "ip_src_addr",
* "init": { "count": "STATS_INIT()" },
* "update": { "count" : "STATS_ADD(count, 1)" },
- * "result": "TO_INTEGER(STATS_COUNT(count))"
+ * "result": "count"
* },
* {
* "profile": "total-count",
* "foreach": "'total'",
* "init": { "count": "STATS_INIT()" },
* "update": { "count": "STATS_ADD(count, 1)" },
- * "result": "TO_INTEGER(STATS_COUNT(count))"
+ * "result": "count"
* },
* {
* "profile": "response-body-len",
@@ -368,7 +368,7 @@ public class BatchProfilerIntegrationTest {
* "foreach": "ip_src_addr",
* "init": { "len": "STATS_INIT()" },
* "update": { "len": "STATS_ADD(len, response_body_len)" },
- * "result": "TO_INTEGER(STATS_SUM(len))"
+ * "result": "len"
* }
* ]
* }
@@ -384,8 +384,36 @@ public class BatchProfilerIntegrationTest {
BatchProfiler profiler = new BatchProfiler();
profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(statsProfileJson));
- // the profiles do the exact same counting, but using the STATS functions
- validateProfiles();
+ /*
+ * the profiles here do the exact same counting as the other test cases, but instead use the STATS
+ * functions. the profiles actually store a data sketch, rather than a single numerical value.
+ * the data sketch is then retrieved and used to calculate the expected counts as part of the test
+ * case validation.
+ */
+
+ // the 'window' looks up to 5 hours before the max timestamp, which in the test data is around July 7, 2018
+ assign("maxTimestamp", "1530978728982L");
+ assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+
+ // there are 26 messages where ip_src_addr = 192.168.66.1
+ assign("sketches","PROFILE_GET('count-by-ip', '192.168.66.1', window)");
+ assign("sketch", "GET_FIRST(sketches)");
+ assertTrue(execute("26 == STATS_COUNT(sketch)", Boolean.class));
+
+ // there are 74 messages where ip_src_addr = 192.168.138.158
+ assign("sketches","PROFILE_GET('count-by-ip', '192.168.138.158', window)");
+ assign("sketch", "GET_FIRST(sketches)");
+ assertTrue(execute("74 == STATS_COUNT(sketch)", Boolean.class));
+
+ // there are 100 messages in all
+ assign("sketches","PROFILE_GET('total-count', 'total', window)");
+ assign("sketch", "GET_FIRST(sketches)");
+ assertTrue(execute("100 == STATS_COUNT(sketch)", Boolean.class));
+
+ // check the sum of the `response_body_len` field
+ assign("sketches","PROFILE_GET('response-body-len', '192.168.138.158', window)");
+ assign("sketch", "GET_FIRST(sketches)");
+ assertTrue(execute("1029726 == STATS_SUM(sketch)", Boolean.class));
}
/**
@@ -427,6 +455,7 @@ public class BatchProfilerIntegrationTest {
*/
private void assign(String var, String expression) {
executor.assign(var, expression, Collections.emptyMap());
+ LOG.debug("{} = {}", var, executor.getState().get(var));
}
/**
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
index 55f3e21..a54c9bf 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
@@ -23,7 +23,7 @@ import org.apache.commons.collections4.IteratorUtils;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Before;
@@ -61,7 +61,7 @@ public class HBaseWriterFunctionTest {
ProfileConfig profile = getProfile();
// setup the profile measurements that will be written
- List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile);
+ List<ProfileMeasurement> measurements = createMeasurements(1, entity, timestamp, profile);
// setup the function to test
HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
@@ -85,7 +85,7 @@ public class HBaseWriterFunctionTest {
ProfileConfig profile = getProfile();
// setup the profile measurements that will be written
- List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile);
+ List<ProfileMeasurement> measurements = createMeasurements(10, entity, timestamp, profile);
// setup the function to test
HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
@@ -104,7 +104,7 @@ public class HBaseWriterFunctionTest {
public void testWriteNone() throws Exception {
// there are no profile measurements to write
- List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+ List<ProfileMeasurement> measurements = new ArrayList<>();
// setup the function to test
HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
@@ -128,17 +128,15 @@ public class HBaseWriterFunctionTest {
* @param profile The profile definition.
* @return
*/
- private List<ProfileMeasurementAdapter> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
- List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+ private List<ProfileMeasurement> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
+ List<ProfileMeasurement> measurements = new ArrayList<>();
for(int i=0; i<count; i++) {
- ProfileMeasurement measurement = new ProfileMeasurement()
+ measurements.add(new ProfileMeasurement()
.withProfileName(profile.getProfile())
.withEntity(entity)
- .withPeriod(timestamp, 15, TimeUnit.MINUTES);
-
- // wrap the measurement using the adapter
- measurements.add(new ProfileMeasurementAdapter(measurement));
+ .withPeriod(timestamp, 15, TimeUnit.MINUTES)
+ .withProfileValue(new OnlineStatisticsProvider()));
}
return measurements;
@@ -171,6 +169,5 @@ public class HBaseWriterFunctionTest {
.withForeach("ip_src_addr")
.withUpdate("count", "count + 1")
.withResult("count");
-
}
}
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
index 9e1e0b3..9e2a37f 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
@@ -22,8 +22,8 @@ package org.apache.metron.profiler.spark.function;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
@@ -75,13 +75,13 @@ public class ProfileBuilderFunctionTest {
// build the profile
ProfileBuilderFunction function = new ProfileBuilderFunction(profilerProperties, getGlobals());
- ProfileMeasurementAdapter measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
+ ProfileMeasurement measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
// validate the measurement
Assert.assertEquals(entity, measurement.getEntity());
Assert.assertEquals(profile.getProfile(), measurement.getProfileName());
- Assert.assertEquals(routes.size(), measurement.toProfileMeasurement().getProfileValue());
- Assert.assertEquals(expectedPeriod.getPeriod(), (long) measurement.getPeriodId());
+ Assert.assertEquals(routes.size(), measurement.getProfileValue());
+ Assert.assertEquals(expectedPeriod.getPeriod(), (long) measurement.getPeriod().getPeriod());
}
/**
@@ -114,7 +114,7 @@ public class ProfileBuilderFunctionTest {
// an exception should be thrown, if there is a bug in the profile definition
ProfileBuilderFunction function = new ProfileBuilderFunction(profilerProperties, getGlobals());
- ProfileMeasurementAdapter measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
+ function.call("profile1-192.168.1.1-0", routes.iterator());
}
private JSONObject getMessage() {