You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/02/06 20:17:38 UTC
[10/17] incubator-metron git commit: METRON-684 Decouple Timestamp
calculation from PROFILE_GET (cestella via ottobackwards) closes
apache/incubator-metron#435
METRON-684 Decouple Timestamp calculation from PROFILE_GET (cestella via ottobackwards) closes apache/incubator-metron#435
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/57c38af1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/57c38af1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/57c38af1
Branch: refs/heads/Metron_0.3.1
Commit: 57c38af1c014a8c9158c51b6d5f9042536b59047
Parents: 84a36a6
Author: cestella <ce...@gmail.com>
Authored: Sun Feb 5 09:30:58 2017 -0500
Committer: Otto Fowler <ot...@apache.org>
Committed: Sun Feb 5 09:30:58 2017 -0500
----------------------------------------------------------------------
.../metron-profiler-client/README.md | 18 +-
.../profiler/client/HBaseProfilerClient.java | 29 +++
.../metron/profiler/client/ProfilerClient.java | 15 ++
.../profiler/client/stellar/FixedLookback.java | 74 +++++++
.../profiler/client/stellar/GetProfile.java | 208 ++-----------------
.../profiler/client/stellar/ProfilerConfig.java | 104 ++++++++++
.../metron/profiler/client/stellar/Util.java | 118 +++++++++++
.../metron/profiler/client/GetProfileTest.java | 75 +++----
.../apache/metron/profiler/ProfilePeriod.java | 27 ++-
.../metron/profiler/hbase/RowKeyBuilder.java | 16 ++
.../profiler/hbase/SaltyRowKeyBuilder.java | 76 +++++--
metron-analytics/metron-profiler/README.md | 6 +-
metron-analytics/metron-statistics/README.md | 4 +-
metron-platform/metron-common/README.md | 12 +-
14 files changed, 533 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/README.md b/metron-analytics/metron-profiler-client/README.md
index 105fce9..60779c8 100644
--- a/metron-analytics/metron-profiler-client/README.md
+++ b/metron-analytics/metron-profiler-client/README.md
@@ -29,8 +29,7 @@ The Stellar client consists of the `PROFILE_GET` command, which takes the follow
REQUIRED:
profile - The name of the profile
entity - The name of the entity
- durationAgo - How long ago should values be retrieved from?
- units - The units of 'durationAgo'
+ periods - The list of profile periods to grab. These are ProfilePeriod objects.
OPTIONAL:
groups_list - Optional, must correspond to the 'groupBy' list used in profile creation - List (in square brackets) of
groupBy values used to filter the profile. Default is the empty list, meaning groupBy was not used when
@@ -40,6 +39,21 @@ OPTIONAL:
```
There is an older calling format where `groups_list` is specified as a sequence of group names, "varargs" style, instead of a List object. This format is still supported for backward compatibility, but it is deprecated, and it is disallowed if the optional `config_overrides` argument is used.
+The `periods` field is (likely) the output of another Stellar function which defines the times to include.
+
+`PROFILE_FIXED`: The profiler periods associated with a fixed lookback starting from now. These are ProfilePeriod objects.
+```
+REQUIRED:
+ durationAgo - How long ago should values be retrieved from?
+ units - The units of 'durationAgo'.
+OPTIONAL:
+ config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter
+ of the same name. Default is the empty Map, meaning no overrides.
+
+e.g. To retrieve all the profiles for the last 5 hours. PROFILE_GET('profile', 'entity', PROFILE_FIXED(5, 'HOURS'))
+```
+
+
### Groups_list argument
The `groups_list` argument in the client must exactly correspond to the [`groupBy`](../metron-profiler#groupby) configuration in the profile definition. If `groupBy` was not used in the profile, `groups_list` must be empty in the client. If `groupBy` was used in the profile, then the client `groups_list` is <b>not</b> optional; it must be the same length as the `groupBy` list, and specify exactly one selected group value for each `groupBy` criterion, in the same order. For example:
```
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index 42df6c2..7c4ec84 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.common.utils.SerDeUtils;
@@ -111,6 +112,34 @@ public class HBaseProfilerClient implements ProfilerClient {
}
/**
+ * Fetch the values stored in a profile based on a set of timestamps.
+ *
+ * @param clazz The type of values stored by the profile.
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
+ * @param groups The groups used to sort the profile data.
+ * @param periods The set of profile measurement periods
+ * @return A list of values.
+ */
+ @Override
+ public <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
+ byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
+ byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
+
+ // find all the row keys that satisfy this fetch
+ List<byte[]> keysToFetch = rowKeyBuilder.rowKeys(profile, entity, groups, periods);
+
+ // create a Get for each of the row keys
+ List<Get> gets = keysToFetch
+ .stream()
+ .map(k -> new Get(k).addColumn(columnFamily, columnQualifier))
+ .collect(Collectors.toList());
+
+ // get the 'gets'
+ return get(gets, columnQualifier, columnFamily, clazz);
+ }
+
+ /**
* Submits multiple Gets to HBase and deserialize the results.
*
* @param gets The gets to submit to HBase.
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
index c6a5379..57b0e04 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
@@ -20,6 +20,8 @@
package org.apache.metron.profiler.client;
+import org.apache.metron.profiler.ProfilePeriod;
+
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -55,4 +57,17 @@ public interface ProfilerClient {
* @return A list of values.
*/
<T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long start, long end);
+
+ /**
+ * Fetch the values stored in a profile based on a set of period keys.
+ *
+ * @param clazz The type of values stored by the profile.
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
+ * @param groups The groups used to sort the profile data.
+ * @param periods The set of profile period keys
+ * @param <T> The type of values stored by the profile.
+ * @return A list of values.
+ */
+ <T> List<T> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
new file mode 100644
index 0000000..c4ed582
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/FixedLookback.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.profiler.ProfilePeriod;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@Stellar(
+ namespace="PROFILE",
+ name="FIXED",
+ description="The profiler periods associated with a fixed lookback starting from now.",
+ params={
+ "durationAgo - How long ago should values be retrieved from?",
+ "units - The units of 'durationAgo'.",
+ "config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
+ "of the same name. Default is the empty Map, meaning no overrides."
+ },
+ returns="The selected profile measurement periods. These are ProfilePeriod objects."
+)
+public class FixedLookback implements StellarFunction {
+
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ Optional<Map> configOverridesMap = Optional.empty();
+ long durationAgo = Util.getArg(0, Long.class, args);
+ String unitsName = Util.getArg(1, String.class, args);
+ TimeUnit units = TimeUnit.valueOf(unitsName);
+ if(args.size() > 2) {
+ Map rawMap = Util.getArg(2, Map.class, args);
+ configOverridesMap = rawMap == null || rawMap.isEmpty() ? Optional.empty() : Optional.of(rawMap);
+ }
+ Map<String, Object> effectiveConfigs = Util.getEffectiveConfig(context, configOverridesMap.orElse(null));
+ Long tickDuration = ProfilerConfig.PROFILER_PERIOD.get(effectiveConfigs, Long.class);
+ TimeUnit tickUnit = TimeUnit.valueOf(ProfilerConfig.PROFILER_PERIOD_UNITS.get(effectiveConfigs, String.class));
+ long end = System.currentTimeMillis();
+ long start = end - units.toMillis(durationAgo);
+ return ProfilePeriod.visitPeriods(start, end, tickDuration, tickUnit, Optional.empty(), period -> period);
+ }
+
+ @Override
+ public void initialize(Context context) {
+
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index beb55e0..ecce7e0 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -20,16 +20,15 @@
package org.apache.metron.profiler.client.stellar;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.common.dsl.Context;
import org.apache.metron.common.dsl.ParseException;
import org.apache.metron.common.dsl.Stellar;
import org.apache.metron.common.dsl.StellarFunction;
-import org.apache.metron.common.utils.ConversionUtils;
import org.apache.metron.hbase.HTableProvider;
import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.client.HBaseProfilerClient;
import org.apache.metron.profiler.client.ProfilerClient;
import org.apache.metron.profiler.hbase.ColumnBuilder;
@@ -40,16 +39,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static java.lang.String.format;
-import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
+import static org.apache.metron.profiler.client.stellar.ProfilerConfig.*;
+import static org.apache.metron.profiler.client.stellar.Util.getArg;
+import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
/**
* A Stellar function that can retrieve data contained within a Profile.
@@ -86,8 +82,7 @@ import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
params={
"profile - The name of the profile.",
"entity - The name of the entity.",
- "durationAgo - How long ago should values be retrieved from?",
- "units - The units of 'durationAgo'.",
+ "periods - The list of profile periods to grab. These are ProfilePeriod objects.",
"groups_list - Optional, must correspond to the 'groupBy' list used in profile creation - List (in square brackets) of "+
"groupBy values used to filter the profile. Default is the " +
"empty list, meaning groupBy was not used when creating the profile.",
@@ -98,62 +93,7 @@ import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
)
public class GetProfile implements StellarFunction {
- /**
- * A global property that defines the name of the HBase table used to store profile data.
- */
- public static final String PROFILER_HBASE_TABLE = "profiler.client.hbase.table";
-
- /**
- * A global property that defines the name of the column family used to store profile data.
- */
- public static final String PROFILER_COLUMN_FAMILY = "profiler.client.hbase.column.family";
-
- /**
- * A global property that defines the name of the HBaseTableProvider implementation class.
- */
- public static final String PROFILER_HBASE_TABLE_PROVIDER = "hbase.provider.impl";
- /**
- * A global property that defines the duration of each profile period. This value
- * should be defined along with 'profiler.client.period.duration.units'.
- */
- public static final String PROFILER_PERIOD = "profiler.client.period.duration";
-
- /**
- * A global property that defines the units of the profile period duration. This value
- * should be defined along with 'profiler.client.period.duration'.
- */
- public static final String PROFILER_PERIOD_UNITS = "profiler.client.period.duration.units";
-
- /**
- * A global property that defines the salt divisor used to store profile data.
- */
- public static final String PROFILER_SALT_DIVISOR = "profiler.client.salt.divisor";
-
- /**
- * The default Profile HBase table name should none be defined in the global properties.
- */
- public static final String PROFILER_HBASE_TABLE_DEFAULT = "profiler";
-
- /**
- * The default Profile column family name should none be defined in the global properties.
- */
- public static final String PROFILER_COLUMN_FAMILY_DEFAULT = "P";
-
- /**
- * The default Profile period duration should none be defined in the global properties.
- */
- public static final String PROFILER_PERIOD_DEFAULT = "15";
-
- /**
- * The default units of the Profile period should none be defined in the global properties.
- */
- public static final String PROFILER_PERIOD_UNITS_DEFAULT = "MINUTES";
-
- /**
- * The default salt divisor should none be defined in the global properties.
- */
- public static final String PROFILER_SALT_DIVISOR_DEFAULT = "1000";
/**
* Cached client that can retrieve profile values.
@@ -193,29 +133,27 @@ public class GetProfile implements StellarFunction {
String profile = getArg(0, String.class, args);
String entity = getArg(1, String.class, args);
- long durationAgo = getArg(2, Long.class, args);
- String unitsName = getArg(3, String.class, args);
- TimeUnit units = TimeUnit.valueOf(unitsName);
+ Optional<List<ProfilePeriod>> periods = Optional.ofNullable(getArg(2, List.class, args));
//Optional arguments
@SuppressWarnings("unchecked")
List<Object> groups = null;
Map configOverridesMap = null;
- if (args.size() < 5) {
+ if (args.size() < 4) {
// no optional args, so default 'groups' and configOverridesMap remains null.
groups = new ArrayList<>(0);
}
- else if (args.get(4) instanceof List) {
+ else if (args.get(3) instanceof List) {
// correct extensible usage
- groups = getArg(4, List.class, args);
- if (args.size() >= 6) {
- configOverridesMap = getArg(5, Map.class, args);
+ groups = getArg(3, List.class, args);
+ if (args.size() >= 5) {
+ configOverridesMap = getArg(4, Map.class, args);
if (configOverridesMap.isEmpty()) configOverridesMap = null;
}
}
else {
// Deprecated "varargs" style usage for groups_list
// configOverridesMap cannot be specified so it remains null.
- groups = getGroupsArg(4, args);
+ groups = getGroupsArg(3, args);
}
Map<String, Object> effectiveConfig = getEffectiveConfig(context, configOverridesMap);
@@ -229,83 +167,10 @@ public class GetProfile implements StellarFunction {
cachedConfigMap = effectiveConfig;
}
- return client.fetch(Object.class, profile, entity, groups, durationAgo, units);
+ return client.fetch(Object.class, profile, entity, groups, periods.orElse(new ArrayList<>(0)));
}
- /**
- * Merge the configuration parameter override Map into the config from global context,
- * and return the result. This has to be done on each call, because either may have changed.
- *
- * Only the six recognized profiler client config parameters may be set,
- * all other key-value pairs in either Map will be ignored.
- *
- * Type violations cause a Stellar ParseException.
- *
- * @param context - from which we get the global config Map.
- * @param configOverridesMap - Map of overrides as described above.
- * @return effective config Map with overrides applied.
- * @throws ParseException - if any override values are of wrong type.
- */
- private Map<String, Object> getEffectiveConfig(
- Context context
- , Map configOverridesMap
- ) throws ParseException {
-
- final String[] KEYLIST = {
- PROFILER_HBASE_TABLE, PROFILER_COLUMN_FAMILY,
- PROFILER_HBASE_TABLE_PROVIDER, PROFILER_PERIOD,
- PROFILER_PERIOD_UNITS, PROFILER_SALT_DIVISOR};
-
- // ensure the required capabilities are defined
- final Context.Capabilities[] required = { GLOBAL_CONFIG };
- validateCapabilities(context, required);
- @SuppressWarnings("unchecked")
- Map<String, Object> global = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG).get();
-
- Map<String, Object> result = new HashMap<String, Object>(6);
- Object v;
-
- // extract the relevant parameters from global
- for (String k : KEYLIST) {
- v = global.get(k);
- if (v != null) result.put(k, v);
- }
- if (configOverridesMap == null) return result;
- // extract override values, typechecking as we go
- try {
- for (Object key : configOverridesMap.keySet()) {
- if (!(key instanceof String)) {
- // Probably unintended user error, so throw an exception rather than ignore
- throw new ParseException("Non-string key in config_overrides map is not allowed: " + key.toString());
- }
- switch ((String) key) {
- case PROFILER_HBASE_TABLE:
- case PROFILER_COLUMN_FAMILY:
- case PROFILER_HBASE_TABLE_PROVIDER:
- case PROFILER_PERIOD_UNITS:
- v = configOverridesMap.get(key);
- v = ConversionUtils.convert(v, String.class);
- result.put((String) key, v);
- break;
- case PROFILER_PERIOD:
- case PROFILER_SALT_DIVISOR:
- // be tolerant if the user put a number instead of a string
- // regardless, validate that it is an integer value
- v = configOverridesMap.get(key);
- long vlong = ConversionUtils.convert(v, Long.class);
- result.put((String) key, String.valueOf(vlong));
- break;
- default:
- LOG.warn("Ignoring unallowed key {} in config_overrides map.", key);
- break;
- }
- }
- } catch (ClassCastException | NumberFormatException cce) {
- throw new ParseException("Type violation in config_overrides map values: ", cce);
- }
- return result;
- }
/**
* Get the groups defined by the user.
@@ -329,40 +194,9 @@ public class GetProfile implements StellarFunction {
return groups;
}
- /**
- * Ensure that the required capabilities are defined.
- * @param context The context to validate.
- * @param required The required capabilities.
- * @throws IllegalStateException if all of the required capabilities are not present in the Context.
- */
- private void validateCapabilities(Context context, Context.Capabilities[] required) throws IllegalStateException {
- // collect the name of each missing capability
- String missing = Stream
- .of(required)
- .filter(c -> !context.getCapability(c).isPresent())
- .map(c -> c.toString())
- .collect(Collectors.joining(", "));
- if(StringUtils.isNotBlank(missing) || context == null) {
- throw new IllegalStateException("missing required context: " + missing);
- }
- }
-
- /**
- * Get an argument from a list of arguments.
- * @param index The index within the list of arguments.
- * @param clazz The type expected.
- * @param args All of the arguments.
- * @param <T> The type of the argument expected.
- */
- private <T> T getArg(int index, Class<T> clazz, List<Object> args) {
- if(index >= args.size()) {
- throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
- }
- return ConversionUtils.convert(args.get(index), clazz);
- }
/**
* Creates the ColumnBuilder to use in accessing the profile data.
@@ -371,7 +205,7 @@ public class GetProfile implements StellarFunction {
private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
ColumnBuilder columnBuilder;
- String columnFamily = (String) global.getOrDefault(PROFILER_COLUMN_FAMILY, PROFILER_COLUMN_FAMILY_DEFAULT);
+ String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
return columnBuilder;
@@ -384,18 +218,16 @@ public class GetProfile implements StellarFunction {
private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
// how long is the profile period?
- String configuredDuration = (String) global.getOrDefault(PROFILER_PERIOD, PROFILER_PERIOD_DEFAULT);
- long duration = Long.parseLong(configuredDuration);
+ long duration = PROFILER_PERIOD.get(global, Long.class);
LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
// which units are used to define the profile period?
- String configuredUnits = (String) global.getOrDefault(PROFILER_PERIOD_UNITS, PROFILER_PERIOD_UNITS_DEFAULT);
+ String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
TimeUnit units = TimeUnit.valueOf(configuredUnits);
LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
// what is the salt divisor?
- String configuredSaltDivisor = (String) global.getOrDefault(PROFILER_SALT_DIVISOR, PROFILER_SALT_DIVISOR_DEFAULT);
- int saltDivisor = Integer.parseInt(configuredSaltDivisor);
+ Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
LOG.debug("profiler client: {}={}", PROFILER_SALT_DIVISOR, saltDivisor);
return new SaltyRowKeyBuilder(saltDivisor, duration, units);
@@ -408,7 +240,7 @@ public class GetProfile implements StellarFunction {
*/
private HTableInterface getTable(Map<String, Object> global) {
- String tableName = (String) global.getOrDefault(PROFILER_HBASE_TABLE, PROFILER_HBASE_TABLE_DEFAULT);
+ String tableName = PROFILER_HBASE_TABLE.get(global, String.class);
TableProvider provider = getTableProvider(global);
try {
@@ -424,7 +256,7 @@ public class GetProfile implements StellarFunction {
* @param global The global configuration.
*/
private TableProvider getTableProvider(Map<String, Object> global) {
- String clazzName = (String) global.getOrDefault(PROFILER_HBASE_TABLE_PROVIDER, HTableProvider.class.getName());
+ String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
TableProvider provider;
try {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
new file mode 100644
index 0000000..f409ca8
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerConfig.java
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.hbase.HTableProvider;
+
+import java.util.Map;
+
+public enum ProfilerConfig {
+ /**
+ * A global property that defines the name of the HBase table used to store profile data.
+ */
+ PROFILER_HBASE_TABLE("profiler.client.hbase.table", "profiler", String.class),
+
+ /**
+ * A global property that defines the name of the column family used to store profile data.
+ */
+ PROFILER_COLUMN_FAMILY("profiler.client.hbase.column.family", "P", String.class),
+
+ /**
+ * A global property that defines the name of the HBaseTableProvider implementation class.
+ */
+ PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", HTableProvider.class.getName(), String.class),
+
+ /**
+ * A global property that defines the duration of each profile period. This value
+ * should be defined along with 'profiler.client.period.duration.units'.
+ */
+ PROFILER_PERIOD("profiler.client.period.duration", 15L, Long.class),
+
+ /**
+ * A global property that defines the units of the profile period duration. This value
+ * should be defined along with 'profiler.client.period.duration'.
+ */
+ PROFILER_PERIOD_UNITS("profiler.client.period.duration.units", "MINUTES", String.class),
+
+ /**
+ * A global property that defines the salt divisor used to store profile data.
+ */
+ PROFILER_SALT_DIVISOR("profiler.client.salt.divisor", 1000L, Long.class);
+
+ String key;
+ Object defaultValue;
+ Class<?> valueType;
+ ProfilerConfig(String key, Object defaultValue, Class<?> valueType) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.valueType = valueType;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public Object getDefault() {
+ return getDefault(valueType);
+ }
+
+ public <T> T getDefault(Class<T> clazz) {
+ return defaultValue == null?null:ConversionUtils.convert(defaultValue, clazz);
+ }
+
+ public Object get(Map<String, Object> profilerConfig) {
+ return getOrDefault(profilerConfig, defaultValue);
+ }
+
+ public Object getOrDefault(Map<String, Object> profilerConfig, Object defaultValue) {
+ return getOrDefault(profilerConfig, defaultValue, valueType);
+ }
+
+ public <T> T get(Map<String, Object> profilerConfig, Class<T> clazz) {
+ return getOrDefault(profilerConfig, defaultValue, clazz);
+ }
+
+ public <T> T getOrDefault(Map<String, Object> profilerConfig, Object defaultValue, Class<T> clazz) {
+ Object o = profilerConfig.getOrDefault(key, defaultValue);
+ return o == null?null:ConversionUtils.convert(o, clazz);
+ }
+
+ @Override
+ public String toString() {
+ return key;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
new file mode 100644
index 0000000..ab22967
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
+public class Util {
+ private static final Logger LOG = LoggerFactory.getLogger(Util.class);
+
+ /**
+ * Ensure that the required capabilities are defined.
+ * @param context The context to validate.
+ * @param required The required capabilities.
+ * @throws IllegalStateException if all of the required capabilities are not present in the Context.
+ */
+ public static void validateCapabilities(Context context, Context.Capabilities[] required) throws IllegalStateException {
+
+ // collect the name of each missing capability
+ String missing = Stream
+ .of(required)
+ .filter(c -> !context.getCapability(c).isPresent())
+ .map(c -> c.toString())
+ .collect(Collectors.joining(", "));
+
+ if(StringUtils.isNotBlank(missing) || context == null) {
+ throw new IllegalStateException("missing required context: " + missing);
+ }
+ }
+
+ /**
+ * Merge the configuration parameter override Map into the config from global context,
+ * and return the result. This has to be done on each call, because either may have changed.
+ *
+ * Only the six recognized profiler client config parameters may be set,
+ * all other key-value pairs in either Map will be ignored.
+ *
+ * Type violations cause a Stellar ParseException.
+ *
+ * @param context - from which we get the global config Map.
+ * @param configOverridesMap - Map of overrides as described above.
+ * @return effective config Map with overrides applied.
+ * @throws ParseException - if any override values are of wrong type.
+ */
+ public static Map<String, Object> getEffectiveConfig(Context context , Map configOverridesMap ) throws ParseException {
+ // ensure the required capabilities are defined
+ final Context.Capabilities[] required = { GLOBAL_CONFIG };
+ validateCapabilities(context, required);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> global = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG).get();
+
+ Map<String, Object> result = new HashMap<>(6);
+
+ // extract the relevant parameters from global, the overrides and the defaults
+ for (ProfilerConfig k : ProfilerConfig.values()) {
+ Object globalValue = global.containsKey(k.key)?ConversionUtils.convert(global.get(k.key), k.valueType):null;
+ Object overrideValue = configOverridesMap == null?null:k.getOrDefault(configOverridesMap, null);
+ Object defaultValue = k.defaultValue;
+ if(overrideValue != null) {
+ result.put(k.key, overrideValue);
+ }
+ else if(globalValue != null) {
+ result.put(k.key, globalValue);
+ }
+ else if(defaultValue != null) {
+ result.put(k.key, defaultValue);
+ }
+ }
+ return result;
+ }
+
+
+ /**
+ * Get an argument from a list of arguments.
+ * @param index The index within the list of arguments.
+ * @param clazz The type expected.
+ * @param args All of the arguments.
+ * @param <T> The type of the argument expected.
+ */
+ public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+ if(index >= args.size()) {
+ throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
+ }
+
+ return ConversionUtils.convert(args.get(index), clazz);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
index 960795b..e1ebdbd 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
@@ -28,6 +28,7 @@ import org.apache.metron.common.dsl.functions.resolver.SingletonFunctionResolver
import org.apache.metron.common.dsl.ParseException;
import org.apache.metron.hbase.TableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
@@ -49,12 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.ProfilerConfig.*;
/**
* Tests the GetProfile class.
@@ -114,18 +110,19 @@ public class GetProfileTest {
// global properties
Map<String, Object> global = new HashMap<String, Object>() {{
- put(PROFILER_HBASE_TABLE, tableName);
- put(PROFILER_COLUMN_FAMILY, columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER, MockTableProvider.class.getName());
- put(PROFILER_PERIOD, Long.toString(periodDuration));
- put(PROFILER_PERIOD_UNITS, periodUnits.toString());
- put(PROFILER_SALT_DIVISOR, Integer.toString(saltDivisor));
+ put(PROFILER_HBASE_TABLE.getKey(), tableName);
+ put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+ put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockTableProvider.class.getName());
+ put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
+ put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
+ put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
}};
// create the stellar execution environment
executor = new DefaultStellarExecutor(
new SimpleFunctionResolver()
- .withClass(GetProfile.class),
+ .withClass(GetProfile.class)
+ .withClass(FixedLookback.class),
new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
.build());
@@ -154,12 +151,12 @@ public class GetProfileTest {
// global properties
Map<String, Object> global = new HashMap<String, Object>() {{
- put(PROFILER_HBASE_TABLE, tableName);
- put(PROFILER_COLUMN_FAMILY, columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER, MockTableProvider.class.getName());
- put(PROFILER_PERIOD, Long.toString(periodDuration2));
- put(PROFILER_PERIOD_UNITS, periodUnits2.toString());
- put(PROFILER_SALT_DIVISOR, Integer.toString(saltDivisor2));
+ put(PROFILER_HBASE_TABLE.getKey(), tableName);
+ put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+ put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockTableProvider.class.getName());
+ put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
+ put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
+ put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
}};
// create the modified context
@@ -170,7 +167,8 @@ public class GetProfileTest {
// create the stellar execution environment
executor = new DefaultStellarExecutor(
new SimpleFunctionResolver()
- .withClass(GetProfile.class),
+ .withClass(GetProfile.class)
+ .withClass(FixedLookback.class),
context2);
return context2; //because there is no executor.getContext() method
@@ -197,7 +195,7 @@ public class GetProfileTest {
profileWriter.write(m, count, group, val -> expectedValue);
// execute - read the profile values - no groups
- String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS')";
+ String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
@SuppressWarnings("unchecked")
List<Integer> result = run(expr, List.class);
@@ -228,7 +226,7 @@ public class GetProfileTest {
state.put("groups", group);
// execute - read the profile values
- String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', ['weekends'])";
+ String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
@SuppressWarnings("unchecked")
List<Integer> result = run(expr, List.class);
@@ -236,7 +234,7 @@ public class GetProfileTest {
Assert.assertEquals(count, result.size());
// test the deprecated but allowed "varargs" form of groups specification
- expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', 'weekends')";
+ expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekends')";
result = run(expr, List.class);
// validate - expect to read all values from the past 4 hours
@@ -266,7 +264,7 @@ public class GetProfileTest {
state.put("groups", group);
// execute - read the profile values
- String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', ['weekdays', 'tuesday'])";
+ String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekdays', 'tuesday'])";
@SuppressWarnings("unchecked")
List<Integer> result = run(expr, List.class);
@@ -274,7 +272,7 @@ public class GetProfileTest {
Assert.assertEquals(count, result.size());
// test the deprecated but allowed "varargs" form of groups specification
- expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', 'weekdays', 'tuesday')";
+ expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekdays', 'tuesday')";
result = run(expr, List.class);
// validate - expect to read all values from the past 4 hours
@@ -295,7 +293,7 @@ public class GetProfileTest {
SingletonFunctionResolver.getInstance().initialize(empty);
// validate - function should be unable to initialize
- String expr = "PROFILE_GET('profile1', 'entity1', 1000, 'SECONDS', groups)";
+ String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 'SECONDS'), groups)";
run(expr, List.class);
}
@@ -321,7 +319,7 @@ public class GetProfileTest {
state.put("groups", group);
// execute - read the profile values
- String expr = "PROFILE_GET('profile1', 'entity1', 4, 'SECONDS')";
+ String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))";
@SuppressWarnings("unchecked")
List<Integer> result = run(expr, List.class);
@@ -353,13 +351,13 @@ public class GetProfileTest {
// validate it is changed in significant way
@SuppressWarnings("unchecked")
Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
- Assert.assertEquals(global.get(PROFILER_PERIOD), Long.toString(periodDuration2));
+ Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
Assert.assertNotEquals(periodDuration, periodDuration2);
// execute - read the profile values - with (wrong) default global config values.
// No error message at this time, but returns empty results list, because
// row keys are not correctly calculated.
- String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS')";
+ String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
@SuppressWarnings("unchecked")
List<Integer> result = run(expr, List.class);
@@ -368,10 +366,11 @@ public class GetProfileTest {
// execute - read the profile values - with config_override.
// first two override values are strings, third is deliberately a number.
- expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', [], {"
- + "'profiler.client.period.duration' : '" + periodDuration + "', "
+ String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
+ "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
- + "'profiler.client.salt.divisor' : " + saltDivisor + " })";
+ + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
+ expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + overrides + "), [], " + overrides + ")"
+ ;
result = run(expr, List.class);
// validate - expect to read all values from the past 4 hours
@@ -407,15 +406,17 @@ public class GetProfileTest {
// validate it is changed in significant way
@SuppressWarnings("unchecked")
Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
- Assert.assertEquals(global.get(PROFILER_PERIOD), Long.toString(periodDuration2));
+ Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), Long.toString(periodDuration2));
Assert.assertNotEquals(periodDuration, periodDuration2);
// execute - read the profile values - with config_override.
// first two override values are strings, third is deliberately a number.
- String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', ['weekends'], {"
- + "'profiler.client.period.duration' : '" + periodDuration + "', "
+ String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
+ "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
- + "'profiler.client.salt.divisor' : " + saltDivisor + " })";
+ + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
+ String expr = "PROFILE_GET('profile1', 'entity1'" +
+ ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
+ overrides + ")";
@SuppressWarnings("unchecked")
List<Integer> result = run(expr, List.class);
@@ -425,7 +426,7 @@ public class GetProfileTest {
// execute - read the profile values - with (wrong) default global config values.
// No error message at this time, but returns empty results list, because
// row keys are not correctly calculated.
- expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', ['weekends'])";
+ expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
result = run(expr, List.class);
// validate - expect to fail to read any values
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
index c466919..f916d65 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfilePeriod.java
@@ -20,7 +20,12 @@
package org.apache.metron.profiler;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
import static java.lang.String.format;
@@ -41,6 +46,7 @@ public class ProfilePeriod {
*/
private long durationMillis;
+
/**
* @param epochMillis A timestamp contained somewhere within the profile period.
* @param duration The duration of each profile period.
@@ -51,7 +57,6 @@ public class ProfilePeriod {
throw new IllegalArgumentException(format(
"period duration must be greater than 0; got '%d %s'", duration, units));
}
-
this.durationMillis = units.toMillis(duration);
this.period = epochMillis / durationMillis;
}
@@ -75,6 +80,7 @@ public class ProfilePeriod {
return period;
}
+
public long getDurationMillis() {
return durationMillis;
}
@@ -103,4 +109,23 @@ public class ProfilePeriod {
", durationMillis=" + durationMillis +
'}';
}
+
+ public static <T> List<T> visitPeriods(long startEpochMillis
+ , long endEpochMillis
+ , long duration
+ , TimeUnit units
+ , Optional<Predicate<ProfilePeriod>> inclusionPredicate
+ , Function<ProfilePeriod,T> transformation
+ )
+ {
+ ProfilePeriod period = new ProfilePeriod(startEpochMillis, duration, units);
+ List<T> ret = new ArrayList<>();
+ while(period.getStartTimeMillis() <= endEpochMillis) {
+ if(!inclusionPredicate.isPresent() || inclusionPredicate.get().test(period)) {
+ ret.add(transformation.apply(period));
+ }
+ period = period.next();
+ }
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
index b53a1ac..e49bb0a 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/RowKeyBuilder.java
@@ -21,6 +21,7 @@
package org.apache.metron.profiler.hbase;
import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
import java.io.Serializable;
import java.util.List;
@@ -56,4 +57,19 @@ public interface RowKeyBuilder extends Serializable {
* @return All of the row keys necessary to retrieve the profile measurements.
*/
List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long start, long end);
+
+ /**
+ * Builds a list of row keys necessary to retrieve a profile's measurements over
+ * a time horizon.
+ *
+ * This method is useful when attempting to read ProfileMeasurements stored in HBase.
+ *
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
+ * @param groups The group(s) used to sort the profile data.
+ * @param periods The profile measurement periods to compute the rowkeys for
+ * @return All of the row keys necessary to retrieve the profile measurements.
+ */
+ List<byte[]> rowKeys(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
index 4e2b44f..b01fc28 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
@@ -29,6 +29,7 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -81,24 +82,40 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
*/
@Override
public List<byte[]> rowKeys(String profile, String entity, List<Object> groups, long start, long end) {
- List<byte[]> rowKeys = new ArrayList<>();
-
// be forgiving of out-of-order start and end times; order is critical to this algorithm
end = Math.max(start, end);
start = Math.min(start, end);
// find the starting period and advance until the end time is reached
- ProfilePeriod period = new ProfilePeriod(start, periodDurationMillis, TimeUnit.MILLISECONDS);
- while(period.getStartTimeMillis() <= end) {
+ return ProfilePeriod.visitPeriods( start
+ , end
+ , periodDurationMillis
+ , TimeUnit.MILLISECONDS
+ , Optional.empty()
+ , period -> rowKey(profile, entity, period, groups)
+ );
- byte[] k = rowKey(profile, entity, period, groups);
- rowKeys.add(k);
+ }
- // advance to the next period
- period = period.next();
+ /**
+ * Builds a list of row keys necessary to retrieve a profile's measurements over
+ * a time horizon.
+ * <p>
+ * This method is useful when attempting to read ProfileMeasurements stored in HBase.
+ *
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
+ * @param groups The group(s) used to sort the profile data.
+ * @param periods The profile measurement periods to compute the rowkeys for
+ * @return All of the row keys necessary to retrieve the profile measurements.
+ */
+ @Override
+ public List<byte[]> rowKeys(String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods) {
+ List<byte[]> ret = new ArrayList<>();
+ for(ProfilePeriod period : periods) {
+ ret.add(rowKey(profile, entity, period, groups));
}
-
- return rowKeys;
+ return ret;
}
/**
@@ -120,6 +137,18 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
* @return The HBase row key.
*/
public byte[] rowKey(String profile, String entity, ProfilePeriod period, List<Object> groups) {
+ return rowKey(profile, entity, period.getPeriod(), groups);
+ }
+
+ /**
+ * Build the row key.
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
+ * @param period The measure period
+ * @param groups The groups.
+ * @return The HBase row key.
+ */
+ public byte[] rowKey(String profile, String entity, long period, List<Object> groups) {
// row key = salt + prefix + group(s) + time
byte[] salt = getSalt(period, saltDivisor);
@@ -161,25 +190,44 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
groups.forEach(g -> builder.append(g));
return Bytes.toBytes(builder.toString());
}
-
/**
* Builds the 'time' portion of the row key
* @param period The ProfilePeriod in which the ProfileMeasurement was taken.
*/
private static byte[] timeKey(ProfilePeriod period) {
- long thePeriod = period.getPeriod();
- return Bytes.toBytes(thePeriod);
+ return timeKey(period.getPeriod());
+ }
+
+ /**
+ * Builds the 'time' portion of the row key
+ * @param period the period
+ */
+ private static byte[] timeKey(long period) {
+ return Bytes.toBytes(period);
}
/**
* Calculates a salt value that is used as part of the row key.
*
- * The salt is calculated as 'md5(timestamp) % N' where N is a configurable value that ideally
+ * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
* is close to the number of nodes in the Hbase cluster.
*
* @param period The period in which a profile measurement is taken.
*/
public static byte[] getSalt(ProfilePeriod period, int saltDivisor) {
+ return getSalt(period.getPeriod(), saltDivisor);
+ }
+
+ /**
+ * Calculates a salt value that is used as part of the row key.
+ *
+ * The salt is calculated as 'md5(period) % N' where N is a configurable value that ideally
+ * is close to the number of nodes in the Hbase cluster.
+ *
+ * @param period The period
+ * @param saltDivisor The salt divisor
+ */
+ public static byte[] getSalt(long period, int saltDivisor) {
try {
// an MD5 is 16 bytes aka 128 bits
MessageDigest digest = MessageDigest.getInstance("MD5");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-profiler/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/README.md b/metron-analytics/metron-profiler/README.md
index 04e1c0d..dfff277 100644
--- a/metron-analytics/metron-profiler/README.md
+++ b/metron-analytics/metron-profiler/README.md
@@ -66,11 +66,11 @@ This section will describe the steps required to get your first profile running.
1. Use the Profiler Client to read the profile data. The below example `PROFILE_GET` command will read data written by the sample profile given above, if 10.0.0.1 is one of the input values for `ip_src_addr`.
More information on configuring and using the client can be found [here](../metron-profiler-client).
-It is assumed that the PROFILE_GET client is correctly configured before using it.
+It is assumed that the `PROFILE_GET` client is correctly configured before using it.
```
$ bin/stellar -z node1:2181
- [Stellar]>>> PROFILE_GET( "test", "10.0.0.1", 30, "MINUTES")
+ [Stellar]>>> PROFILE_GET( "test", "10.0.0.1", PROFILE_FIXED(30, "MINUTES"))
[451, 448]
```
@@ -334,7 +334,7 @@ Retrieve the last 30 minutes of profile measurements for a specific host.
```
$ bin/stellar -z node1:2181
-[Stellar]>>> stats := PROFILE_GET( "example4", "10.0.0.1", 30, "MINUTES")
+[Stellar]>>> stats := PROFILE_GET( "example4", "10.0.0.1", PROFILE_FIXED(30, "MINUTES"))
[Stellar]>>> stats
[org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, ...]
```
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-analytics/metron-statistics/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-statistics/README.md b/metron-analytics/metron-statistics/README.md
index 257fd0b..f6ab15f 100644
--- a/metron-analytics/metron-statistics/README.md
+++ b/metron-analytics/metron-statistics/README.md
@@ -341,7 +341,7 @@ Create the following in
"stellar" : {
"config" : {
"parser_score" : "OUTLIER_MAD_SCORE(OUTLIER_MAD_STATE_MERGE(
-PROFILE_GET( 'sketchy_mad', 'global', 10, 'MINUTES') ), value)"
+PROFILE_GET( 'sketchy_mad', 'global', PROFILE_FIXED(10, 'MINUTES')) ), value)"
,"is_alert" : "if parser_score > 3.5 then true else is_alert"
}
}
@@ -384,7 +384,7 @@ Create the following file at
"onlyif": "true",
"init" : {
"s": "OUTLIER_MAD_STATE_MERGE(PROFILE_GET('sketchy_mad',
-'global', 5, 'MINUTES'))"
+'global', PROFILE_FIXED(5, 'MINUTES')))"
},
"update": {
"s": "OUTLIER_MAD_ADD(s, value)"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/57c38af1/metron-platform/metron-common/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index c24ae73..fbf3b50 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -124,6 +124,7 @@ The `!=` operator is the negation of the above.
| [ `MAP_EXISTS`](#map_exists) |
| [ `MONTH`](#month) |
| [ `PROFILE_GET`](#profile_get) |
+| [ `PROFILE_FIXED`](#profile_fixed) |
| [ `PROTOCOL_TO_NAME`](#protocol_to_name) |
| [ `REGEXP_MATCH`](#regexp_match) |
| [ `SPLIT`](#split) |
@@ -439,12 +440,19 @@ The `!=` operator is the negation of the above.
* Input:
* profile - The name of the profile.
* entity - The name of the entity.
- * durationAgo - How long ago should values be retrieved from?
- * units - The units of 'durationAgo'.
+ * periods - The list of profile periods to grab. These are ProfilePeriod objects.
* groups_list - Optional, must correspond to the 'groupBy' list used in profile creation - List (in square brackets) of groupBy values used to filter the profile. Default is the empty list, meaning groupBy was not used when creating the profile.
* config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter of the same name. Default is the empty Map, meaning no overrides.
* Returns: The selected profile measurements.
+### `PROFILE_FIXED`
+ * Description: The profile periods associated with a fixed lookback starting from now
+ * Input:
+ * durationAgo - How long ago should values be retrieved from?
+ * units - The units of 'durationAgo'.
+ * config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter of the same name. Default is the empty Map, meaning no overrides.
+ * Returns: The selected profile measurement timestamps. These are ProfilePeriod objects.
+
### `PROTOCOL_TO_NAME`
* Description: Converts the IANA protocol number to the protocol name
* Input: