You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/12/18 14:00:13 UTC
[metron] branch master updated: METRON-1925 Provide Verbose View of
Profile Results in REPL (nickwallen) closes apache/metron#1292
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 2e6caa9 METRON-1925 Provide Verbose View of Profile Results in REPL (nickwallen) closes apache/metron#1292
2e6caa9 is described below
commit 2e6caa908c0b7708281d76b6744d63706fe610b2
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Tue Dec 18 08:59:08 2018 -0500
METRON-1925 Provide Verbose View of Profile Results in REPL (nickwallen) closes apache/metron#1292
---
.../profiler/client/HBaseProfilerClient.java | 133 ++++++------
.../metron/profiler/client/ProfilerClient.java | 36 ++--
.../metron/profiler/client/stellar/GetProfile.java | 66 +++---
.../client/stellar/ProfilerClientConfig.java | 1 +
.../metron/profiler/client/stellar/Util.java | 16 +-
.../profiler/client/stellar/VerboseProfile.java | 222 +++++++++++++++++++++
.../profiler/client/HBaseProfilerClientTest.java | 167 +++++-----------
.../metron/profiler/client/ProfileWriter.java | 20 +-
.../profiler/client/stellar/GetProfileTest.java | 38 ++--
.../client/stellar/VerboseProfileTest.java | 220 ++++++++++++++++++++
.../org/apache/metron/profiler/ProfilePeriod.java | 3 +
metron-stellar/stellar-common/README.md | 12 +-
12 files changed, 673 insertions(+), 261 deletions(-)
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index de2d42c..2e537da 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -24,18 +24,17 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.common.utils.SerDeUtils;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* The default implementation of a ProfilerClient that fetches profile data persisted in HBase.
@@ -57,30 +56,16 @@ public class HBaseProfilerClient implements ProfilerClient {
*/
private ColumnBuilder columnBuilder;
- public HBaseProfilerClient(HTableInterface table, RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder) {
+ private long periodDurationMillis;
+
+ public HBaseProfilerClient(HTableInterface table,
+ RowKeyBuilder rowKeyBuilder,
+ ColumnBuilder columnBuilder,
+ long periodDurationMillis) {
setTable(table);
setRowKeyBuilder(rowKeyBuilder);
setColumnBuilder(columnBuilder);
- }
-
- /**
- * Fetches all of the data values associated with a Profile.
- *
- * @param clazz The type of values stored by the profile.
- * @param profile The name of the profile.
- * @param entity The name of the entity.
- * @param groups The groups used to sort the profile data.
- * @param durationAgo How far in the past to fetch values from.
- * @param unit The time unit of 'durationAgo'.
- * @param defaultValue The default value to specify. If empty, the result will be sparse.
- * @param <T> The type of values stored by the Profile.
- * @return A list of values.
- */
- @Override
- public <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long durationAgo, TimeUnit unit, Optional<T> defaultValue) {
- long end = System.currentTimeMillis();
- long start = end - unit.toMillis(durationAgo);
- return fetch(clazz, profile, entity, groups, start, end, defaultValue);
+ this.periodDurationMillis = periodDurationMillis;
}
/**
@@ -97,21 +82,15 @@ public class HBaseProfilerClient implements ProfilerClient {
* @return A list of values.
*/
@Override
- public <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long start, long end, Optional<T> defaultValue) {
- byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
- byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
-
- // find all the row keys that satisfy this fetch
- List<byte[]> keysToFetch = rowKeyBuilder.rowKeys(profile, entity, groups, start, end);
-
- // create a Get for each of the row keys
- List<Get> gets = keysToFetch
- .stream()
- .map(k -> new Get(k).addColumn(columnFamily, columnQualifier))
- .collect(Collectors.toList());
-
- // get the 'gets'
- return get(gets, columnQualifier, columnFamily, clazz, defaultValue);
+ public <T> List<ProfileMeasurement> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long start, long end, Optional<T> defaultValue) {
+ List<ProfilePeriod> periods = ProfilePeriod.visitPeriods(
+ start,
+ end,
+ periodDurationMillis,
+ TimeUnit.MILLISECONDS,
+ Optional.empty(),
+ period -> period);
+ return fetch(clazz, profile, entity, groups, periods, defaultValue);
}
/**
@@ -126,48 +105,55 @@ public class HBaseProfilerClient implements ProfilerClient {
* @return A list of values.
*/
@Override
- public <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue) {
- byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
- byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
-
- // find all the row keys that satisfy this fetch
- List<byte[]> keysToFetch = rowKeyBuilder.rowKeys(profile, entity, groups, periods);
-
- // create a Get for each of the row keys
- List<Get> gets = keysToFetch
- .stream()
- .map(k -> new Get(k).addColumn(columnFamily, columnQualifier))
- .collect(Collectors.toList());
+ public <T> List<ProfileMeasurement> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue) {
+ // create a list of profile measurements that need fetched
+ List<ProfileMeasurement> toFetch = new ArrayList<>();
+ for(ProfilePeriod period: periods) {
+ toFetch.add(new ProfileMeasurement()
+ .withProfileName(profile)
+ .withEntity(entity)
+ .withPeriod(period)
+ .withGroups(groups));
+ }
- // get the 'gets'
- return get(gets, columnQualifier, columnFamily, clazz, defaultValue);
+ // retrieve the measurement values from HBase
+ return doFetch(toFetch, clazz, defaultValue);
}
- /**
- * Submits multiple Gets to HBase and deserialize the results.
- *
- * @param gets The gets to submit to HBase.
- * @param columnQualifier The column qualifier.
- * @param columnFamily The column family.
- * @param clazz The type expected in return.
- * @param defaultValue The default value to specify. If empty, the result will be sparse.
- * @param <T> The type expected in return.
- * @return
- */
- private <T> List<T> get(List<Get> gets, byte[] columnQualifier, byte[] columnFamily, Class<T> clazz, Optional<T> defaultValue) {
- List<T> values = new ArrayList<>();
+ private <T> List<ProfileMeasurement> doFetch(List<ProfileMeasurement> measurements, Class<T> clazz, Optional<T> defaultValue) {
+ List<ProfileMeasurement> values = new ArrayList<>();
+
+ // build the gets for HBase
+ byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
+ byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
+ List<Get> gets = new ArrayList<>();
+ for(ProfileMeasurement measurement: measurements) {
+ byte[] rowKey = rowKeyBuilder.rowKey(measurement);
+ Get get = new Get(rowKey).addColumn(columnFamily, columnQualifier);
+ gets.add(get);
+ }
+ // query HBase
try {
Result[] results = table.get(gets);
- for(int i = 0;i < results.length;++i) {
+ for(int i = 0; i < results.length; ++i) {
Result result = results[i];
+ ProfileMeasurement measurement = measurements.get(i);
+
boolean exists = result.containsColumn(columnFamily, columnQualifier);
- if(!exists && defaultValue.isPresent()) {
- values.add(defaultValue.get());
- }
- else if(exists) {
- byte[] val = result.getValue(columnFamily, columnQualifier);
- values.add(SerDeUtils.fromBytes(val, clazz));
+ if(exists) {
+ // value found
+ byte[] value = result.getValue(columnFamily, columnQualifier);
+ measurement.withProfileValue(SerDeUtils.fromBytes(value, clazz));
+ values.add(measurement);
+
+ } else if(defaultValue.isPresent()) {
+ // no value found, use default value provided
+ measurement.withProfileValue(defaultValue.get());
+ values.add(measurement);
+
+ } else {
+ // no value found and no default provided. nothing to do
}
}
} catch(IOException e) {
@@ -177,6 +163,7 @@ public class HBaseProfilerClient implements ProfilerClient {
return values;
}
+
public void setTable(HTableInterface table) {
this.table = table;
}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
index bab4ec9..161575f 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
@@ -20,11 +20,11 @@
package org.apache.metron.profiler.client;
+import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
/**
* An interface for a client capable of retrieving the profile data that has been persisted by the Profiler.
@@ -32,21 +32,6 @@ import java.util.concurrent.TimeUnit;
public interface ProfilerClient {
/**
- * Fetch the measurement values associated with a profile.
- *
- * @param clazz The type of values stored by the profile.
- * @param profile The name of the profile.
- * @param entity The name of the entity.
- * @param groups The groups used to sort the profile data.
- * @param durationAgo How far in the past to fetch values from.
- * @param unit The time unit of 'durationAgo'.
- * @param defaultValue The default value to specify. If empty, the result will be sparse.
- * @param <T> The type of values stored by the Profile.
- * @return A list of values.
- */
- <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long durationAgo, TimeUnit unit, Optional<T> defaultValue);
-
- /**
* Fetch the values stored in a profile based on a start and end timestamp.
*
* @param clazz The type of values stored by the profile.
@@ -57,9 +42,15 @@ public interface ProfilerClient {
* @param end The end time in epoch milliseconds.
* @param defaultValue The default value to specify. If empty, the result will be sparse.
* @param <T> The type of values stored by the profile.
- * @return A list of values.
+ * @return A list of {@link ProfileMeasurement} values.
*/
- <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long start, long end, Optional<T> defaultValue);
+ <T> List<ProfileMeasurement> fetch(Class<T> clazz,
+ String profile,
+ String entity,
+ List<Object> groups,
+ long start,
+ long end,
+ Optional<T> defaultValue);
/**
* Fetch the values stored in a profile based on a set of period keys.
@@ -71,7 +62,12 @@ public interface ProfilerClient {
* @param periods The set of profile period keys
* @param defaultValue The default value to specify. If empty, the result will be sparse.
* @param <T> The type of values stored by the profile.
- * @return A list of values.
+ * @return A list of {@link ProfileMeasurement} values.
*/
- <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue);
+ <T> List<ProfileMeasurement> fetch(Class<T> clazz,
+ String profile,
+ String entity,
+ List<Object> groups,
+ Iterable<ProfilePeriod> periods,
+ Optional<T> defaultValue);
}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index 73cd5a1..a0e2bdf 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -20,31 +20,11 @@
package org.apache.metron.profiler.client.stellar;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
-import static org.apache.metron.profiler.client.stellar.Util.getArg;
-import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
-import org.apache.metron.stellar.dsl.Stellar;
-import org.apache.metron.stellar.dsl.StellarFunction;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.client.HBaseProfilerClient;
import org.apache.metron.profiler.client.ProfilerClient;
@@ -52,9 +32,32 @@ import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.Util.getArg;
+import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
+import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
+
/**
* A Stellar function that can retrieve data contained within a Profile.
*
@@ -90,10 +93,9 @@ import org.slf4j.LoggerFactory;
params={
"profile - The name of the profile.",
"entity - The name of the entity.",
- "periods - The list of profile periods to grab. These are ProfilePeriod objects.",
- "groups_list - Optional, must correspond to the 'groupBy' list used in profile creation - List (in square brackets) of "+
- "groupBy values used to filter the profile. Default is the " +
- "empty list, meaning groupBy was not used when creating the profile.",
+ "periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
+ "groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' " +
+ "list used during profile creation. Defaults to an empty list, meaning no groups.",
"config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
"of the same name. Default is the empty Map, meaning no overrides."
},
@@ -169,16 +171,24 @@ public class GetProfile implements StellarFunction {
RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(effectiveConfig);
ColumnBuilder columnBuilder = getColumnBuilder(effectiveConfig);
HTableInterface table = getTable(effectiveConfig);
- client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+ long periodDuration = getPeriodDurationInMillis(effectiveConfig);
+ client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDuration);
cachedConfigMap = effectiveConfig;
}
if(cachedConfigMap != null) {
defaultValue = ProfilerClientConfig.PROFILER_DEFAULT_VALUE.get(cachedConfigMap);
}
- return client.fetch(Object.class, profile, entity, groups, periods.orElse(new ArrayList<>(0)), Optional.ofNullable(defaultValue));
- }
+ List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups,
+ periods.orElse(new ArrayList<>(0)), Optional.ofNullable(defaultValue));
+ // return only the value of each profile measurement
+ List<Object> values = new ArrayList<>();
+ for(ProfileMeasurement m: measurements) {
+ values.add(m.getProfileValue());
+ }
+ return values;
+ }
/**
* Get the groups defined by the user.
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
index 9bbc29d..1715b23 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
@@ -62,6 +62,7 @@ public enum ProfilerClientConfig {
* The default value to be returned if a profile is not written for a given period for a profile and entity.
*/
PROFILER_DEFAULT_VALUE("profiler.default.value", null, Object.class);
+
String key;
Object defaultValue;
Class<?> valueType;
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
index 82c7fba..ea85c56 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
@@ -20,12 +20,15 @@
package org.apache.metron.profiler.client.stellar;
import static java.lang.String.format;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
@@ -78,7 +81,6 @@ public class Util {
validateCapabilities(context, required);
@SuppressWarnings("unchecked")
Map<String, Object> global = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG).get();
-
Map<String, Object> result = new HashMap<>(6);
// extract the relevant parameters from global, the overrides and the defaults
@@ -99,7 +101,6 @@ public class Util {
return result;
}
-
/**
* Get an argument from a list of arguments.
* @param index The index within the list of arguments.
@@ -114,4 +115,15 @@ public class Util {
return ConversionUtils.convert(args.get(index), clazz);
}
+
+ public static long getPeriodDurationInMillis(Map<String, Object> global) {
+ long duration = PROFILER_PERIOD.get(global, Long.class);
+ LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
+
+ String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
+ TimeUnit units = TimeUnit.valueOf(configuredUnits);
+ LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
+
+ return units.toMillis(duration);
+ }
}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
new file mode 100644
index 0000000..9e857aa
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.ProfilerClient;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_DEFAULT_VALUE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.Util.getArg;
+import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
+/**
+ * A Stellar function that can retrieve profile measurements.
+ *
+ * PROFILE_VERBOSE
+ *
+ * Differs from PROFILE_GET by returning a map containing the profile name, entity, period id, period start,
+ * period end for each profile measurement.
+ *
+ * Retrieve all values for 'entity1' from 'profile1' over the past 4 hours.
+ *
+ * <code>PROFILE_VERBOSE('profile1', 'entity1', PROFILE_WINDOW(4, "HOURS")</code>
+ *
+ * Retrieve all values for 'entity1' from 'profile1' that occurred on 'weekdays' over the past month.
+ *
+ * <code>PROFILE_VERBOSE('profile1', 'entity1', PROFILE_WINDOW(1, "MONTH"), ['weekdays'])</code>
+ */
+@Stellar(
+ namespace="PROFILE",
+ name="VERBOSE",
+ description="Retrieves a series of measurements from a stored profile. Returns a map containing the profile " +
+ "name, entity, period id, period start, period end for each profile measurement. Provides a more " +
+ "verbose view of each measurement than PROFILE_GET. See also PROFILE_GET, PROFILE_FIXED, PROFILE_WINDOW.",
+ params={
+ "profile - The name of the profile.",
+ "entity - The name of the entity.",
+ "periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
+ "groups - Optional, The groups to retrieve. Must correspond to the 'groupBy' " +
+ "list used during profile creation. Defaults to an empty list, meaning no groups. "
+ },
+ returns="A map for each profile measurement containing the profile name, entity, period, and value."
+)
+public class VerboseProfile implements StellarFunction {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected static String PROFILE_KEY = "profile";
+ protected static String ENTITY_KEY = "entity";
+ protected static String PERIOD_KEY = "period";
+ protected static String PERIOD_START_KEY = "period.start";
+ protected static String PERIOD_END_KEY = "period.end";
+ protected static String VALUE_KEY = "value";
+ protected static String GROUPS_KEY = "groups";
+ private ProfilerClient client;
+
+ @Override
+ public void initialize(Context context) {
+ // nothing to do
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return true;
+ }
+
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ // required arguments
+ String profile = getArg(0, String.class, args);
+ String entity = getArg(1, String.class, args);
+ List<ProfilePeriod> periods = getArg(2, List.class, args);
+
+ // optional 'groups' argument
+ List<Object> groups = new ArrayList<>();
+ if(args.size() >= 4) {
+ groups = getArg(3, List.class, args);
+ }
+
+ // get globals from the context
+ Map<String, Object> globals = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
+ .orElse(Collections.emptyMap());
+
+ // lazily create the profiler client, if needed
+ if (client == null) {
+ RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(globals);
+ ColumnBuilder columnBuilder = getColumnBuilder(globals);
+ HTableInterface table = getTable(globals);
+ long periodDuration = getPeriodDurationInMillis(globals);
+ client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDuration);
+ }
+
+ // is there a default value?
+ Optional<Object> defaultValue = Optional.empty();
+ if(globals != null) {
+ defaultValue = Optional.ofNullable(PROFILER_DEFAULT_VALUE.get(globals));
+ }
+
+ List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups, periods, defaultValue);
+
+ // render a view of each profile measurement
+ List<Object> results = new ArrayList<>();
+ for(ProfileMeasurement measurement: measurements) {
+ results.add(render(measurement));
+ }
+ return results;
+ }
+
+ /**
+ * Renders a view of the profile measurement.
+ * @param measurement The profile measurement to render.
+ */
+ private Map<String, Object> render(ProfileMeasurement measurement) {
+ Map<String, Object> view = new HashMap<>();
+ view.put(PROFILE_KEY, measurement.getProfileName());
+ view.put(ENTITY_KEY, measurement.getEntity());
+ view.put(PERIOD_KEY, measurement.getPeriod().getPeriod());
+ view.put(PERIOD_START_KEY, measurement.getPeriod().getStartTimeMillis());
+ view.put(PERIOD_END_KEY, measurement.getPeriod().getEndTimeMillis());
+ view.put(VALUE_KEY, measurement.getProfileValue());
+ view.put(GROUPS_KEY, measurement.getGroups());
+ return view;
+ }
+
+ /**
+ * Creates the ColumnBuilder to use in accessing the profile data.
+ * @param global The global configuration.
+ */
+ private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
+ String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
+ return new ValueOnlyColumnBuilder(columnFamily);
+ }
+
+ /**
+ * Creates the ColumnBuilder to use in accessing the profile data.
+ * @param global The global configuration.
+ */
+ private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
+ Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
+ return new SaltyRowKeyBuilder(saltDivisor, getPeriodDurationInMillis(global), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Create an HBase table used when accessing HBase.
+ * @param global The global configuration.
+ * @return
+ */
+ private HTableInterface getTable(Map<String, Object> global) {
+ String tableName = PROFILER_HBASE_TABLE.get(global, String.class);
+ TableProvider provider = getTableProvider(global);
+ try {
+ return provider.getTable(HBaseConfiguration.create(), tableName);
+
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName), e);
+ }
+ }
+
+ /**
+ * Create the TableProvider to use when accessing HBase.
+ * @param global The global configuration.
+ */
+ private TableProvider getTableProvider(Map<String, Object> global) {
+ String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
+ TableProvider provider;
+ try {
+ @SuppressWarnings("unchecked")
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
+ provider = clazz.getConstructor().newInstance();
+
+ } catch (Exception e) {
+ provider = new HTableProvider();
+ }
+
+ return provider;
+ }
+}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 96c0d91..cc3748e 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -64,17 +64,16 @@ public class HBaseProfilerClientTest {
@Before
public void setup() throws Exception {
-
table = new MockHTable(tableName, columnFamily);
executor = new DefaultStellarStatefulExecutor();
- // used to write values to be read during testing
+ // writes values to be read during testing
+ long periodDurationMillis = periodUnits.toMillis(periodDuration);
RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
- profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+ profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
- // what we're actually testing
- client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+ client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
}
@After
@@ -82,101 +81,55 @@ public class HBaseProfilerClientTest {
table.clear();
}
- /**
- * The client should be able to distinguish between groups and only fetch those in the correct group.
- */
@Test
- public void testFetchWithDurationAgoAndOneGroup() throws Exception {
+ public void Should_ReturnMeasurements_When_DataExistsForAGroup() throws Exception {
+ final String profile = "profile1";
+ final String entity = "entity1";
final int expectedValue = 2302;
final int hours = 2;
- final int count = hours * periodsPerHour;
+ final int count = hours * periodsPerHour + 1;
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
// setup - write two groups of measurements - 'weekends' and 'weekdays'
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
+ ProfileMeasurement prototype = new ProfileMeasurement()
+ .withProfileName(profile)
+ .withEntity(entity)
.withPeriod(startTime, periodDuration, periodUnits);
+ profileWriter.write(prototype, count, Arrays.asList("weekdays"), val -> expectedValue);
+ profileWriter.write(prototype, count, Arrays.asList("weekends"), val -> 0);
- profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
- profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
-
- //valid results
+ long end = System.currentTimeMillis();
+ long start = end - TimeUnit.HOURS.toMillis(2);
{
- // execute
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", Arrays.asList("weekdays"), hours, TimeUnit.HOURS, Optional.empty());
-
- // validate
+ //validate "weekday" results
+ List<Object> groups = Arrays.asList("weekdays");
+ List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, start, end, Optional.empty());
assertEquals(count, results.size());
- results.forEach(actual -> assertEquals(expectedValue, (int) actual));
+ results.forEach(actual -> {
+ assertEquals(profile, actual.getProfileName());
+ assertEquals(entity, actual.getEntity());
+ assertEquals(groups, actual.getGroups());
+ assertEquals(expectedValue, actual.getProfileValue());
+ });
}
- }
-
- /**
- * Attempt to fetch a group that does not exist.
- */
- @Test
- public void testFetchWithDurationAgoAndNoGroup() {
-
- // setup
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-
- // create two groups of measurements - one on weekdays and one on weekends
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekdays"), val -> expectedValue);
- profileWriter.write(m, hours * periodsPerHour, Arrays.asList("weekends"), val -> 0);
-
- // execute
- List<Object> doesNotExist = Arrays.asList("does-not-exist");
{
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", doesNotExist, hours, TimeUnit.HOURS, Optional.empty());
-
- // validate
- assertEquals(0, results.size());
- }
- {
- //with a default value, we'd expect a bunch of 0's
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", doesNotExist, hours, TimeUnit.HOURS, Optional.of(0));
- //8 or 9 15 minute periods in 2 hours depending on when you start
- assertTrue(results.size() == 8 || results.size() == 9);
- results.forEach(actual -> assertEquals(0, (int) actual));
+ //validate "weekend" results
+ List<Object> groups = Arrays.asList("weekends");
+ List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, start, end, Optional.empty());
+ assertEquals(count, results.size());
+ results.forEach(actual -> {
+ assertEquals(profile, actual.getProfileName());
+ assertEquals(entity, actual.getEntity());
+ assertEquals(groups, actual.getGroups());
+ assertEquals(0, actual.getProfileValue());
+ });
}
}
- /**
- * Profile data only within 'milliseconds ago' should be fetched. Data outside of that time horizon should
- * not be fetched.
- */
- @Test
- public void testFetchWithDurationAgoAndOutsideTimeWindow() throws Exception {
- final int hours = 2;
- final List<Object> group = Arrays.asList("weekends");
- final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
-
- // setup - write some values to read later
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, hours * periodsPerHour, group, val -> 1000);
-
- // execute
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", group, 2, TimeUnit.MILLISECONDS, Optional.empty());
-
- // validate - there should NOT be any results from just 2 milliseconds ago
- assertEquals(0, results.size());
- }
-
- /**
- * The client should be able to distinguish between groups and only fetch those in the correct group.
- */
@Test
- public void testFetchWithStartEndAndOneGroup() throws Exception {
+ public void Should_ReturnResultFromGroup_When_MultipleGroupsExist() throws Exception {
+ final String profile = "profile1";
+ final String entity = "entity1";
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -192,21 +145,18 @@ public class HBaseProfilerClientTest {
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
- // execute
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", Arrays.asList("weekdays"), startTime, endTime, Optional.empty());
+ List<Object> weekdays = Arrays.asList("weekdays");
+ List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, weekdays, startTime, endTime, Optional.empty());
- // validate
+ // should only return results from 'weekdays' group
assertEquals(count, results.size());
- results.forEach(actual -> assertEquals(expectedValue, (int) actual));
+ results.forEach(actual -> assertEquals(weekdays, actual.getGroups()));
}
- /**
- * Attempt to fetch a group that does not exist.
- */
@Test
- public void testFetchWithStartEndAndNoGroup() {
-
- // setup
+ public void Should_ReturnNoResults_When_GroupDoesNotExist() {
+ final String profile = "profile1";
+ final String entity = "entity1";
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -222,39 +172,32 @@ public class HBaseProfilerClientTest {
profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
- // execute
- List<Object> doesNotExist = Arrays.asList("does-not-exist");
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", doesNotExist, startTime, endTime, Optional.empty());
-
- // validate
+ // should return no results when the group does not exist
+ List<Object> groups = Arrays.asList("does-not-exist");
+ List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, startTime, endTime, Optional.empty());
assertEquals(0, results.size());
}
- /**
- * Profile data only within 'milliseconds ago' should be fetched. Data outside of that time horizon should
- * not be fetched.
- */
@Test
- public void testFetchWithStartEndAndOutsideTimeWindow() throws Exception {
-
+ public void Should_ReturnNoResults_When_NoDataInStartToEnd() throws Exception {
+ final String profile = "profile1";
+ final String entity = "entity1";
final int hours = 2;
int numberToWrite = hours * periodsPerHour;
final List<Object> group = Arrays.asList("weekends");
final long measurementTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
- // setup - write some values to read later
- ProfileMeasurement m = new ProfileMeasurement()
+ // write some data with a timestamp of s1 day ago
+ ProfileMeasurement prototype = new ProfileMeasurement()
.withProfileName("profile1")
.withEntity("entity1")
.withPeriod(measurementTime, periodDuration, periodUnits);
- profileWriter.write(m, numberToWrite, group, val -> 1000);
+ profileWriter.write(prototype, numberToWrite, group, val -> 1000);
- // execute
+ // should return no results when [start,end] is long after when test data was written
final long endFetchAt = System.currentTimeMillis();
final long startFetchAt = endFetchAt - TimeUnit.MILLISECONDS.toMillis(30);
- List<Integer> results = client.fetch(Integer.class, "profile1", "entity1", group, startFetchAt, endFetchAt, Optional.empty());
-
- // validate - there should NOT be any results from just 2 milliseconds ago
+ List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, group, startFetchAt, endFetchAt, Optional.empty());
assertEquals(0, results.size());
}
}
\ No newline at end of file
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index 317227b..38f1c3e 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -50,11 +50,11 @@ public class ProfileWriter {
private HBaseClient hbaseClient;
private HBaseProfilerClient client;
- public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table) {
+ public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table, long periodDurationMillis) {
this.rowKeyBuilder = rowKeyBuilder;
this.columnBuilder = columnBuilder;
this.hbaseClient = new HBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
- this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+ this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
}
/**
@@ -68,21 +68,22 @@ public class ProfileWriter {
public void write(ProfileMeasurement prototype, int count, List<Object> group, Function<Object, Object> valueGenerator) {
ProfileMeasurement m = prototype;
+ ProfilePeriod period = m.getPeriod();
for(int i=0; i<count; i++) {
-
// generate the next value that should be written
Object nextValue = valueGenerator.apply(m.getProfileValue());
- // create a measurement for the next profile period to be written
- ProfilePeriod next = m.getPeriod().next();
+ // write the measurement
m = new ProfileMeasurement()
.withProfileName(prototype.getProfileName())
.withEntity(prototype.getEntity())
- .withPeriod(next.getStartTimeMillis(), prototype.getPeriod().getDurationMillis(), TimeUnit.MILLISECONDS)
+ .withPeriod(period)
.withGroups(group)
.withProfileValue(nextValue);
-
write(m);
+
+ // advance to the next period
+ period = m.getPeriod().next();
}
}
@@ -111,13 +112,14 @@ public class ProfileWriter {
HTableProvider provider = new HTableProvider();
HTableInterface table = provider.getTable(config, "profiler");
+ long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
long when = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
ProfileMeasurement measure = new ProfileMeasurement()
.withProfileName("profile1")
.withEntity("192.168.66.121")
- .withPeriod(when, 15, TimeUnit.MINUTES);
+ .withPeriod(when, periodDurationMillis, TimeUnit.MILLISECONDS);
- ProfileWriter writer = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+ ProfileWriter writer = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
writer.write(measure, 2 * 24 * 4, Collections.emptyList(), val -> new Random().nextInt(10));
}
}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
index 0eacb42..3fbed1c 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
@@ -22,33 +22,36 @@ package org.apache.metron.profiler.client.stellar;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.profiler.client.ProfileWriter;
-import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
-import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.stellar.FixedLookback;
-import org.apache.metron.profiler.client.stellar.GetProfile;
+import org.apache.metron.profiler.client.ProfileWriter;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Collections;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.*;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
/**
* Tests the GetProfile class.
@@ -89,9 +92,10 @@ public class GetProfileTest {
final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
// used to write values to be read during testing
+ long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
- profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+ profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
// global properties
Map<String, Object> global = new HashMap<String, Object>() {{
@@ -176,7 +180,6 @@ public class GetProfileTest {
.withProfileName("profile1")
.withEntity("entity1")
.withPeriod(startTime, periodDuration, periodUnits);
-
profileWriter.write(m, count, group, val -> expectedValue);
// execute - read the profile values - no groups
@@ -186,6 +189,7 @@ public class GetProfileTest {
// validate - expect to read all values from the past 4 hours
Assert.assertEquals(count, result.size());
+ result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
}
/**
@@ -224,6 +228,7 @@ public class GetProfileTest {
// validate - expect to read all values from the past 4 hours
Assert.assertEquals(count, result.size());
+ result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
}
/**
@@ -262,6 +267,7 @@ public class GetProfileTest {
// validate - expect to read all values from the past 4 hours
Assert.assertEquals(count, result.size());
+ result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
}
/**
@@ -389,6 +395,7 @@ public class GetProfileTest {
// validate - expect to read all values from the past 4 hours
Assert.assertEquals(count, result.size());
+ result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
}
/**
@@ -446,5 +453,4 @@ public class GetProfileTest {
// validate - expect to fail to read any values
Assert.assertEquals(0, result.size());
}
-
}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
new file mode 100644
index 0000000..bd39007
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
@@ -0,0 +1,220 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.client.ProfileWriter;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.ENTITY_KEY;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.GROUPS_KEY;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_END_KEY;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_KEY;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_START_KEY;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.PROFILE_KEY;
+import static org.apache.metron.profiler.client.stellar.VerboseProfile.VALUE_KEY;
+
+/**
+ * Tests the VerboseProfile class.
+ */
+public class VerboseProfileTest {
+ private static final long periodDuration = 15;
+ private static final TimeUnit periodUnits = TimeUnit.MINUTES;
+ private static final int saltDivisor = 1000;
+ private static final String tableName = "profiler";
+ private static final String columnFamily = "P";
+ private StellarStatefulExecutor executor;
+ private Map<String, Object> state;
+ private ProfileWriter profileWriter;
+
+ private <T> T run(String expression, Class<T> clazz) {
+ return executor.execute(expression, state, clazz);
+ }
+
+ private Map<String, Object> globals;
+
+ @Before
+ public void setup() {
+ state = new HashMap<>();
+ final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
+
+ // used to write values to be read during testing
+ long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
+ RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+ ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+ profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
+
+ // global properties
+ globals = new HashMap<String, Object>() {{
+ put(PROFILER_HBASE_TABLE.getKey(), tableName);
+ put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+ put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+ put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
+ put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
+ put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
+ }};
+
+ // create the stellar execution environment
+ executor = new DefaultStellarStatefulExecutor(
+ new SimpleFunctionResolver()
+ .withClass(VerboseProfile.class)
+ .withClass(FixedLookback.class),
+ new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
+ .build());
+ }
+
+ @Test
+ public void shouldReturnMeasurementsWhenNotGrouped() {
+ final int periodsPerHour = 4;
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Collections.emptyList();
+
+ // setup - write some measurements to be read later
+ final int count = hours * periodsPerHour;
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
+ profileWriter.write(m, count, group, val -> expectedValue);
+
+ // expect to see all values over the past 4 hours
+ List<Map<String, Object>> results;
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))", List.class);
+ Assert.assertEquals(count, results.size());
+ for(Map<String, Object> actual: results) {
+ Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
+ Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_START_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_END_KEY));
+ Assert.assertNotNull(actual.get(GROUPS_KEY));
+ Assert.assertEquals(expectedValue, actual.get(VALUE_KEY));
+ }
+ }
+
+ @Test
+ public void shouldReturnMeasurementsWhenGrouped() {
+ final int periodsPerHour = 4;
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Arrays.asList("weekends");
+
+ // setup - write some measurements to be read later
+ final int count = hours * periodsPerHour;
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
+ profileWriter.write(m, count, group, val -> expectedValue);
+
+ // create a variable that contains the groups to use
+ state.put("groups", group);
+
+ // expect to see all values over the past 4 hours for the group
+ List<Map<String, Object>> results;
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), groups)", List.class);
+ Assert.assertEquals(count, results.size());
+ for(Map<String, Object> actual: results) {
+ Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
+ Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_START_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_END_KEY));
+ Assert.assertNotNull(actual.get(GROUPS_KEY));
+ Assert.assertEquals(expectedValue, actual.get(VALUE_KEY));
+ }
+ }
+
+ @Test
+ public void shouldReturnNothingWhenNoMeasurementsExist() {
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Collections.emptyList();
+
+ // setup - write a single value from 2 hours ago
+ ProfileMeasurement m = new ProfileMeasurement()
+ .withProfileName("profile1")
+ .withEntity("entity1")
+ .withPeriod(startTime, periodDuration, periodUnits);
+ profileWriter.write(m, 1, group, val -> expectedValue);
+
+ // expect to get NO measurements over the past 4 seconds
+ List<Map<String, Object>> result;
+ result = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))", List.class);
+ Assert.assertEquals(0, result.size());
+ }
+
+ @Test
+ public void shouldReturnDefaultValueWhenNoMeasurementsExist() {
+ // set a default value
+ String defaultVal = "this is the default value";
+ globals.put("profiler.default.value", defaultVal);
+
+ // no profiles exist
+ String expr = "PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
+ List<Map<String, Object>> results = run(expr, List.class);
+
+ // expect to get the default value instead of no results
+ Assert.assertTrue(results.size() == 16 || results.size() == 17);
+ for(Map<String, Object> actual: results) {
+ Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
+ Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_START_KEY));
+ Assert.assertNotNull(actual.get(PERIOD_END_KEY));
+ Assert.assertNotNull(actual.get(GROUPS_KEY));
+
+ // expect the default value
+ Assert.assertEquals(defaultVal, actual.get(VALUE_KEY));
+ }
+
+ }
+}
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index 02a4932..81de004 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -21,6 +21,7 @@
package org.apache.metron.profiler;
import java.io.Serializable;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -151,6 +152,8 @@ public class ProfilePeriod implements Serializable {
return "ProfilePeriod{" +
"period=" + period +
", durationMillis=" + durationMillis +
+ ", startTime=" + Instant.ofEpochMilli(getStartTimeMillis()).toString() +
+ ", endTime=" + Instant.ofEpochMilli(getEndTimeMillis()).toString() +
'}';
}
diff --git a/metron-stellar/stellar-common/README.md b/metron-stellar/stellar-common/README.md
index 77877b9..febb1db 100644
--- a/metron-stellar/stellar-common/README.md
+++ b/metron-stellar/stellar-common/README.md
@@ -248,6 +248,7 @@ Where:
| [ `OBJECT_GET`](#object_get) |
| [ `PREPEND_IF_MISSING`](#prepend_if_missing) |
| [ `PROFILE_GET`](#profile_get) |
+| [ `PROFILE_VERBOSE`](#profile_verbose) |
| [ `PROFILE_FIXED`](#profile_fixed) |
| [ `PROFILE_WINDOW`](#profile_window) |
| [ `PROTOCOL_TO_NAME`](#protocol_to_name) |
@@ -866,11 +867,20 @@ Where:
* Input:
* profile - The name of the profile.
* entity - The name of the entity.
- * periods - The list of profile periods to grab. These are ProfilePeriod objects.
+ * periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.
* groups_list - Optional, must correspond to the 'groupBy' list used in profile creation - List (in square brackets) of groupBy values used to filter the profile. Default is the empty list, meaning groupBy was not used when creating the profile.
* config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter of the same name. Default is the empty Map, meaning no overrides.
* Returns: The selected profile measurements.
+### `PROFILE_VERBOSE`
+ * Description: Retrieves a series of measurements from a stored profile. Returns a map containing the profile name, entity, period id, period start, period end for each profile measurement. Provides a more verbose view of each measurement than PROFILE_GET.
+ * Input:
+ * profile - The name of the profile.
+ * entity - The name of the entity.
+ * periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.
+ * groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' used during profile creation. Defaults to an empty list, meaning no groups.
+ * Returns: The selected profile measurements.
+
### `PROFILE_FIXED`
* Description: The profile periods associated with a fixed lookback starting from now
* Input: