You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2019/11/25 16:31:16 UTC

[metron] branch master updated: METRON-2285 Batch Profiler Cannot Persist Data Sketches (nickwallen via mmiklavc) closes apache/metron#1564

This is an automated email from the ASF dual-hosted git repository.

mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 101d427  METRON-2285 Batch Profiler Cannot Persist Data Sketches (nickwallen via mmiklavc) closes apache/metron#1564
101d427 is described below

commit 101d4275ce09b819b2312407e427469370f69ef8
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Mon Nov 25 09:30:59 2019 -0700

    METRON-2285 Batch Profiler Cannot Persist Data Sketches (nickwallen via mmiklavc) closes apache/metron#1564
---
 .../metron/profiler/spark/BatchProfiler.java       |   5 +-
 .../profiler/spark/ProfileMeasurementAdapter.java  | 132 ---------------------
 .../spark/function/HBaseWriterFunction.java        |  10 +-
 .../spark/function/ProfileBuilderFunction.java     |  12 +-
 .../spark/BatchProfilerIntegrationTest.java        |  57 ++++++---
 .../spark/function/HBaseWriterFunctionTest.java    |  21 ++--
 .../spark/function/ProfileBuilderFunctionTest.java |  10 +-
 7 files changed, 69 insertions(+), 178 deletions(-)

diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 43b42be..96e1880 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -22,6 +22,7 @@ package org.apache.metron.profiler.spark;
 import com.google.common.collect.Maps;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.spark.function.GroupByPeriodFunction;
 import org.apache.metron.profiler.spark.function.HBaseWriterFunction;
 import org.apache.metron.profiler.spark.function.MessageRouterFunction;
@@ -91,9 +92,9 @@ public class BatchProfiler implements Serializable {
     LOG.debug("Generated {} message route(s)", routes.cache().count());
 
     // build the profiles
-    Dataset<ProfileMeasurementAdapter> measurements = routes
+    Dataset<ProfileMeasurement> measurements = routes
             .groupByKey(new GroupByPeriodFunction(profilerProps), Encoders.STRING())
-            .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurementAdapter.class));
+            .mapGroups(new ProfileBuilderFunction(profilerProps, globals), Encoders.kryo(ProfileMeasurement.class));
     LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
 
     // write the profile measurements to HBase
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
deleted file mode 100644
index 5da7d04..0000000
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/ProfileMeasurementAdapter.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-package org.apache.metron.profiler.spark;
-
-import org.apache.metron.common.utils.SerDeUtils;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.ProfilePeriod;
-
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * An adapter for the {@link ProfileMeasurement} class so that the data
- * can be serialized as required by Spark.
- *
- * <p>The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object` well. This
- * adapter encodes the profile's result as byte[] rather than an Object to work around this.
- */
-public class ProfileMeasurementAdapter implements Serializable {
-
-  /**
-   * The name of the profile that this measurement is associated with.
-   */
-  private String profileName;
-
-  /**
-   * The name of the entity being profiled.
-   */
-  private String entity;
-
-  /**
-   * A monotonically increasing number identifying the period.  The first period is 0
-   * and began at the epoch.
-   */
-  private Long periodId;
-
-  /**
-   * The duration of each period in milliseconds.
-   */
-  private Long durationMillis;
-
-  /**
-   * The result of evaluating the profile expression.
-   *
-   * The `Encoders.bean(Class<T>)` encoder does not handle serialization of type `Object`. This
-   * adapter encodes the profile's result as `byte[]` rather than an `Object` to work around this.
-   */
-  private byte[] profileValue;
-
-  public ProfileMeasurementAdapter() {
-    // default constructor required for serialization in Spark
-  }
-
-  public ProfileMeasurementAdapter(ProfileMeasurement measurement) {
-    this.profileName = measurement.getProfileName();
-    this.entity = measurement.getEntity();
-    this.periodId = measurement.getPeriod().getPeriod();
-    this.durationMillis = measurement.getPeriod().getDurationMillis();
-    this.profileValue = SerDeUtils.toBytes(measurement.getProfileValue());
-  }
-
-  public ProfileMeasurement toProfileMeasurement() {
-    ProfilePeriod period = ProfilePeriod.fromPeriodId(periodId, durationMillis, TimeUnit.MILLISECONDS);
-    ProfileMeasurement measurement = new ProfileMeasurement()
-            .withProfileName(profileName)
-            .withEntity(entity)
-            .withPeriod(period)
-            .withProfileValue(SerDeUtils.fromBytes(profileValue, Object.class));
-    return measurement;
-  }
-
-  public String getProfileName() {
-    return profileName;
-  }
-
-  public void setProfileName(String profileName) {
-    this.profileName = profileName;
-  }
-
-  public String getEntity() {
-    return entity;
-  }
-
-  public void setEntity(String entity) {
-    this.entity = entity;
-  }
-
-  public Long getPeriodId() {
-    return periodId;
-  }
-
-  public void setPeriodId(Long periodId) {
-    this.periodId = periodId;
-  }
-
-  public Long getDurationMillis() {
-    return durationMillis;
-  }
-
-  public void setDurationMillis(Long durationMillis) {
-    this.durationMillis = durationMillis;
-  }
-
-  public byte[] getProfileValue() {
-    return profileValue;
-  }
-
-  public void setProfileValue(byte[] profileValue) {
-    this.profileValue = profileValue;
-  }
-
-  public void setProfileValue(Object profileValue) {
-    this.profileValue = SerDeUtils.toBytes(profileValue);
-  }
-}
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
index cfabd94..afb7241 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -32,7 +32,6 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
 import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
 import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +55,7 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATI
 /**
  * Writes the profile measurements to HBase in Spark.
  */
-public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurementAdapter, Integer> {
+public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasurement, Integer> {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -108,20 +107,19 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
    * @return The number of measurements written to HBase.
    */
   @Override
-  public Iterator<Integer> call(Iterator<ProfileMeasurementAdapter> iterator) throws Exception {
+  public Iterator<Integer> call(Iterator<ProfileMeasurement> iterator) throws Exception {
     int count = 0;
     LOG.debug("About to write profile measurement(s) to HBase");
 
     // do not open hbase connection, if nothing to write
-    List<ProfileMeasurementAdapter> measurements = IteratorUtils.toList(iterator);
+    List<ProfileMeasurement> measurements = IteratorUtils.toList(iterator);
     if(measurements.size() > 0) {
 
       // open an HBase connection
       Configuration config = HBaseConfiguration.create();
       try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) {
 
-        for (ProfileMeasurementAdapter adapter : measurements) {
-          ProfileMeasurement m = adapter.toProfileMeasurement();
+        for (ProfileMeasurement m : measurements) {
           client.addMutation(rowKeyBuilder.rowKey(m), columnBuilder.columns(m), durability);
         }
         count = client.mutate();
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
index 7283b48..e052397 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunction.java
@@ -23,7 +23,6 @@ import org.apache.metron.profiler.DefaultMessageDistributor;
 import org.apache.metron.profiler.MessageDistributor;
 import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.spark.api.java.function.MapGroupsFunction;
 import org.slf4j.Logger;
@@ -50,7 +49,7 @@ import static org.apache.metron.profiler.spark.function.GroupByPeriodFunction.pr
 /**
  * The function responsible for building profiles in Spark.
  */
-public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurementAdapter>  {
+public class ProfileBuilderFunction implements MapGroupsFunction<String, MessageRoute, ProfileMeasurement>  {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -74,7 +73,7 @@ public class ProfileBuilderFunction implements MapGroupsFunction<String, Message
    * @return
    */
   @Override
-  public ProfileMeasurementAdapter call(String group, Iterator<MessageRoute> iterator) {
+  public ProfileMeasurement call(String group, Iterator<MessageRoute> iterator) {
     // create the distributor; some settings are unnecessary because it is cleaned-up immediately after processing the batch
     int maxRoutes = Integer.MAX_VALUE;
     long profileTTLMillis = Long.MAX_VALUE;
@@ -93,13 +92,12 @@ public class ProfileBuilderFunction implements MapGroupsFunction<String, Message
     }
 
     // flush the profile
-    ProfileMeasurementAdapter result;
+    ProfileMeasurement result;
     List<ProfileMeasurement> measurements = distributor.flush();
     if(measurements.size() == 1) {
-      ProfileMeasurement m = measurements.get(0);
-      result = new ProfileMeasurementAdapter(m);
+      result = measurements.get(0);
       LOG.debug("Profile measurement created; profile={}, entity={}, period={}, value={}",
-              m.getProfileName(), m.getEntity(), m.getPeriod().getPeriod(), m.getProfileValue());
+              result.getProfileName(), result.getEntity(), result.getPeriod().getPeriod(), result.getProfileValue());
 
     } else if(measurements.size() == 0) {
       String msg = format("No profile measurement can be calculated. Review the profile for bugs. profile=%s, entity=%s, period=%s",
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index 72fd283..1e9f32b 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -19,24 +19,23 @@
  */
 package org.apache.metron.profiler.spark;
 
-import com.google.common.collect.Maps;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.profiler.MessageRoute;
 import org.apache.metron.profiler.client.stellar.FixedLookback;
 import org.apache.metron.profiler.client.stellar.GetProfile;
 import org.apache.metron.profiler.client.stellar.WindowLookback;
-import org.apache.metron.profiler.spark.function.MessageRouterFunction;
+import org.apache.metron.statistics.StellarStatisticsFunctions;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.ConversionFunctions;
+import org.apache.metron.stellar.dsl.functions.DataStructureFunctions;
+import org.apache.metron.stellar.dsl.functions.StringFunctions;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkException;
-import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
-import org.json.simple.JSONObject;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -46,9 +45,7 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -69,7 +66,6 @@ import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INP
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.ORC;
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.PARQUET;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -130,7 +126,11 @@ public class BatchProfilerIntegrationTest {
             new SimpleFunctionResolver()
                     .withClass(GetProfile.class)
                     .withClass(FixedLookback.class)
-                    .withClass(WindowLookback.class),
+                    .withClass(WindowLookback.class)
+                    .withClass(DataStructureFunctions.Length.class)
+                    .withClass(StringFunctions.GetFirst.class)
+                    .withClass(StellarStatisticsFunctions.Count.class)
+                    .withClass(StellarStatisticsFunctions.Sum.class),
             new Context.Builder()
                     .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
                     .build());
@@ -353,14 +353,14 @@ public class BatchProfilerIntegrationTest {
     *        "foreach": "ip_src_addr",
     *        "init": { "count": "STATS_INIT()" },
     *        "update": { "count" : "STATS_ADD(count, 1)" },
-    *        "result": "TO_INTEGER(STATS_COUNT(count))"
+    *        "result": "count"
     *      },
     *      {
     *        "profile": "total-count",
     *        "foreach": "'total'",
     *        "init": { "count": "STATS_INIT()" },
     *        "update": { "count": "STATS_ADD(count, 1)" },
-    *        "result": "TO_INTEGER(STATS_COUNT(count))"
+    *        "result": "count"
     *      },
     *      {
     *        "profile": "response-body-len",
@@ -368,7 +368,7 @@ public class BatchProfilerIntegrationTest {
     *        "foreach": "ip_src_addr",
     *        "init": { "len": "STATS_INIT()" },
     *        "update": { "len": "STATS_ADD(len, response_body_len)" },
-    *        "result": "TO_INTEGER(STATS_SUM(len))"
+    *        "result": "len"
     *       }
     *   ]
     * }
@@ -384,8 +384,36 @@ public class BatchProfilerIntegrationTest {
     BatchProfiler profiler = new BatchProfiler();
     profiler.run(spark, profilerProperties, getGlobals(), readerProperties, fromJSON(statsProfileJson));
 
-    // the profiles do the exact same counting, but using the STATS functions
-    validateProfiles();
+    /*
+     * the profiles here do the exact same counting as the other test cases, but instead use the STATS
+     * functions. the profiles actually store a data sketch, rather than a single numerical value.
+     * the data sketch is then retrieved and used to calculate the expected counts as part of the test
+     * case validation.
+     */
+    
+    // the 'window' looks up to 5 hours before the max timestamp, which in the test data is around July 7, 2018
+    assign("maxTimestamp", "1530978728982L");
+    assign("window", "PROFILE_WINDOW('from 5 hours ago', maxTimestamp)");
+
+    // there are 26 messages where ip_src_addr = 192.168.66.1
+    assign("sketches","PROFILE_GET('count-by-ip', '192.168.66.1', window)");
+    assign("sketch", "GET_FIRST(sketches)");
+    assertTrue(execute("26 == STATS_COUNT(sketch)", Boolean.class));
+
+    // there are 74 messages where ip_src_addr = 192.168.138.158
+    assign("sketches","PROFILE_GET('count-by-ip', '192.168.138.158', window)");
+    assign("sketch", "GET_FIRST(sketches)");
+    assertTrue(execute("74 == STATS_COUNT(sketch)", Boolean.class));
+
+    // there are 100 messages in all
+    assign("sketches","PROFILE_GET('total-count', 'total', window)");
+    assign("sketch", "GET_FIRST(sketches)");
+    assertTrue(execute("100 == STATS_COUNT(sketch)", Boolean.class));
+
+    // check the sum of the `response_body_len` field
+    assign("sketches","PROFILE_GET('response-body-len', '192.168.138.158', window)");
+    assign("sketch", "GET_FIRST(sketches)");
+    assertTrue(execute("1029726 == STATS_SUM(sketch)", Boolean.class));
   }
 
   /**
@@ -427,6 +455,7 @@ public class BatchProfilerIntegrationTest {
    */
   private void assign(String var, String expression) {
     executor.assign(var, expression, Collections.emptyMap());
+    LOG.debug("{} = {}", var, executor.getState().get(var));
   }
 
   /**
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
index 55f3e21..a54c9bf 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
@@ -23,7 +23,7 @@ import org.apache.commons.collections4.IteratorUtils;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
+import org.apache.metron.statistics.OnlineStatisticsProvider;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
@@ -61,7 +61,7 @@ public class HBaseWriterFunctionTest {
     ProfileConfig profile = getProfile();
 
     // setup the profile measurements that will be written
-    List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile);
+    List<ProfileMeasurement> measurements = createMeasurements(1, entity, timestamp, profile);
 
     // setup the function to test
     HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
@@ -85,7 +85,7 @@ public class HBaseWriterFunctionTest {
     ProfileConfig profile = getProfile();
 
     // setup the profile measurements that will be written
-    List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile);
+    List<ProfileMeasurement> measurements = createMeasurements(10, entity, timestamp, profile);
 
     // setup the function to test
     HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
@@ -104,7 +104,7 @@ public class HBaseWriterFunctionTest {
   public void testWriteNone() throws Exception {
 
     // there are no profile measurements to write
-    List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+    List<ProfileMeasurement> measurements = new ArrayList<>();
 
     // setup the function to test
     HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
@@ -128,17 +128,15 @@ public class HBaseWriterFunctionTest {
    * @param profile The profile definition.
    * @return
    */
-  private List<ProfileMeasurementAdapter> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
-    List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
+  private List<ProfileMeasurement> createMeasurements(int count, String entity, long timestamp, ProfileConfig profile) {
+    List<ProfileMeasurement> measurements = new ArrayList<>();
 
     for(int i=0; i<count; i++) {
-      ProfileMeasurement measurement = new ProfileMeasurement()
+      measurements.add(new ProfileMeasurement()
               .withProfileName(profile.getProfile())
               .withEntity(entity)
-              .withPeriod(timestamp, 15, TimeUnit.MINUTES);
-
-      // wrap the measurement using the adapter
-      measurements.add(new ProfileMeasurementAdapter(measurement));
+              .withPeriod(timestamp, 15, TimeUnit.MINUTES)
+              .withProfileValue(new OnlineStatisticsProvider()));
     }
 
     return measurements;
@@ -171,6 +169,5 @@ public class HBaseWriterFunctionTest {
             .withForeach("ip_src_addr")
             .withUpdate("count", "count + 1")
             .withResult("count");
-
   }
 }
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
index 9e1e0b3..9e2a37f 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/ProfileBuilderFunctionTest.java
@@ -22,8 +22,8 @@ package org.apache.metron.profiler.spark.function;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.profiler.MessageRoute;
+import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
@@ -75,13 +75,13 @@ public class ProfileBuilderFunctionTest {
 
     // build the profile
     ProfileBuilderFunction function = new ProfileBuilderFunction(profilerProperties, getGlobals());
-    ProfileMeasurementAdapter measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
+    ProfileMeasurement measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
 
     // validate the measurement
     Assert.assertEquals(entity, measurement.getEntity());
     Assert.assertEquals(profile.getProfile(), measurement.getProfileName());
-    Assert.assertEquals(routes.size(), measurement.toProfileMeasurement().getProfileValue());
-    Assert.assertEquals(expectedPeriod.getPeriod(), (long) measurement.getPeriodId());
+    Assert.assertEquals(routes.size(), measurement.getProfileValue());
+    Assert.assertEquals(expectedPeriod.getPeriod(), (long) measurement.getPeriod().getPeriod());
   }
 
   /**
@@ -114,7 +114,7 @@ public class ProfileBuilderFunctionTest {
 
     // an exception should be thrown, if there is a bug in the profile definition
     ProfileBuilderFunction function = new ProfileBuilderFunction(profilerProperties, getGlobals());
-    ProfileMeasurementAdapter measurement = function.call("profile1-192.168.1.1-0", routes.iterator());
+    function.call("profile1-192.168.1.1-0", routes.iterator());
   }
 
   private JSONObject getMessage() {