You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/09/13 18:36:33 UTC
incubator-metron git commit: METRON-391 Create Stellar Function to
Read Profile Data for Model Scoring (nickwallen via cestella) closes
apache/incubator-metron#242
Repository: incubator-metron
Updated Branches:
refs/heads/master eaf625a79 -> ef3e9fa5e
METRON-391 Create Stellar Function to Read Profile Data for Model Scoring (nickwallen via cestella) closes apache/incubator-metron#242
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/ef3e9fa5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/ef3e9fa5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/ef3e9fa5
Branch: refs/heads/master
Commit: ef3e9fa5ea506f3ba624ca2102ea9028782dd974
Parents: eaf625a
Author: nickwallen <ni...@nickallen.org>
Authored: Tue Sep 13 14:36:20 2016 -0400
Committer: cstella <ce...@gmail.com>
Committed: Tue Sep 13 14:36:20 2016 -0400
----------------------------------------------------------------------
.../profiler/client/HBaseProfilerClient.java | 46 ++--
.../profiler/client/stellar/GetProfile.java | 269 +++++++++++++++++++
.../metron/profiler/client/GetProfileTest.java | 234 ++++++++++++++++
.../client/HBaseProfilerClientTest.java | 93 ++-----
.../metron/profiler/client/ProfileWriter.java | 97 +++++++
.../metron/profiler/ProfileMeasurement.java | 14 +-
.../profiler/bolt/ProfileBuilderBolt.java | 1 +
.../profiler/bolt/ProfileSplitterBolt.java | 5 +-
.../stellar/DefaultStellarExecutor.java | 63 +++--
.../profiler/stellar/StellarExecutor.java | 30 ++-
.../stellar/DefaultStellarExecutorTest.java | 19 ++
.../org/apache/metron/common/dsl/Context.java | 3 +-
.../metron/common/dsl/FunctionResolver.java | 23 ++
.../common/dsl/FunctionResolverSingleton.java | 30 ++-
.../metron/common/dsl/StellarFunctionInfo.java | 33 ++-
.../src/main/config/zookeeper/global.json | 12 +-
16 files changed, 832 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index b4e52b5..97691d4 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -64,43 +64,57 @@ public class HBaseProfilerClient implements ProfilerClient {
/**
* Fetches all of the data values associated with a Profile.
*
- * @param profile The name of the profile.
- * @param entity The name of the entity.
+ * @param profile The name of the profile.
+ * @param entity The name of the entity.
* @param durationAgo How far in the past to fetch values from.
- * @param unit The time unit of 'durationAgo'.
- * @param groups The groups
- * @param <T> The type of values stored by the Profile.
+ * @param unit The time unit of 'durationAgo'.
+ * @param groups The groups
+ * @param <T> The type of values stored by the Profile.
* @return A list of profile values.
*/
@Override
public <T> List<T> fetch(String profile, String entity, long durationAgo, TimeUnit unit, Class<T> clazz, List<Object> groups) {
+ byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
+ byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
// find all the row keys that satisfy this fetch
List<byte[]> keysToFetch = rowKeyBuilder.rowKeys(profile, entity, groups, durationAgo, unit);
- byte[] columnFamilyBytes = Bytes.toBytes(columnBuilder.getColumnFamily());
- byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
// create a Get for each of the row keys
List<Get> gets = keysToFetch
.stream()
- .map(k -> new Get(k).addColumn(columnFamilyBytes, columnQualifier))
+ .map(k -> new Get(k).addColumn(columnFamily, columnQualifier))
.collect(Collectors.toList());
- // submit the gets to HBase
- try {
- List<T> values = new ArrayList<>();
+ // get the 'gets'
+ return get(gets, columnQualifier, columnFamily, clazz);
+ }
+ /**
+ * Submits multiple Gets to HBase and deserialize the results.
+ *
+ * @param gets The gets to submit to HBase.
+ * @param columnQualifier The column qualifier.
+ * @param columnFamily The column family.
+ * @param clazz The type expected in return.
+ * @param <T> The type expected in return.
+ * @return
+ */
+ private <T> List<T> get(List<Get> gets, byte[] columnQualifier, byte[] columnFamily, Class<T> clazz) {
+ List<T> values = new ArrayList<>();
+
+ try {
Result[] results = table.get(gets);
Arrays.stream(results)
- .filter(r -> r.containsColumn(columnFamilyBytes, columnQualifier))
- .map(r -> r.getValue(columnFamilyBytes, columnQualifier))
+ .filter(r -> r.containsColumn(columnFamily, columnQualifier))
+ .map(r -> r.getValue(columnFamily, columnQualifier))
.forEach(val -> values.add(Serializer.fromBytes(val, clazz)));
- return values;
-
} catch(IOException e) {
throw new RuntimeException(e);
}
+
+ return values;
}
public void setTable(HTableInterface table) {
@@ -114,4 +128,4 @@ public class HBaseProfilerClient implements ProfilerClient {
public void setColumnBuilder(ColumnBuilder columnBuilder) {
this.columnBuilder = columnBuilder;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
new file mode 100644
index 0000000..ffe9470
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -0,0 +1,269 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client.stellar;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.common.dsl.Stellar;
+import org.apache.metron.common.dsl.StellarFunction;
+import org.apache.metron.common.utils.ConversionUtils;
+import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.ProfilerClient;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.dsl.Context.Capabilities.GLOBAL_CONFIG;
+
+/**
+ * A Stellar function that can retrieve data contained within a Profile.
+ *
+ * PROFILE_GET
+ *
+ * Retrieve all values for 'entity1' from 'profile1' over the past 4 hours.
+ *
+ * <code>PROFILE_GET('profile1', 'entity1', 4, 'HOURS')</code>
+ *
+ * Retrieve all values for 'entity1' from 'profile1' over the past 2 days.
+ *
+ * <code>PROFILE_GET('profile1', 'entity1', 2, 'DAYS')</code>
+ *
+ * Retrieve all values for 'entity1' from 'profile1' that occurred on 'weekdays' over the past month.
+ *
+ * <code>PROFILE_GET('profile1', 'entity1', 1, 'MONTHS', 'weekdays')</code>
+ *
+ */
+@Stellar(
+ namespace="PROFILE",
+ name="GET",
+ description="Retrieves a series of values from a stored profile.",
+ params={
+ "profile - The name of the profile.",
+ "entity - The name of the entity.",
+ "durationAgo - How long ago should values be retrieved from?",
+ "units - The units of 'durationAgo'.",
+ "groups - Optional - The groups used to sort the profile."
+ },
+ returns="The profile measurements."
+)
+public class GetProfile implements StellarFunction {
+
+ /**
+ * A global property that defines the name of the HBase table storing profile definitions.
+ */
+ public static final String PROFILER_HBASE_TABLE = "profiler.hbase.table";
+
+ /**
+ * A global property that defines the name of the column family used to store profile data.
+ */
+ public static final String PROFILER_COLUMN_FAMILY = "profiler.column.family";
+
+ /**
+ * A global property that defines the name of the HBaseTableProvider implementation class.
+ */
+ public static final String PROFILER_HBASE_TABLE_PROVIDER = "profiler.hbase.table.provider";
+
+ /**
+ * A client that can retrieve profile values.
+ */
+ private ProfilerClient client;
+
+ /**
+ * Initialization.
+ */
+ @Override
+ public void initialize(Context context) {
+
+ // ensure the required capabilities are defined
+ Context.Capabilities[] required = { GLOBAL_CONFIG };
+ validateCapabilities(context, required);
+ Map<String, Object> global = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG).get();
+
+ // create the profiler client
+ ColumnBuilder columnBuilder = getColumnBuilder(global);
+ RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(global);
+ HTableInterface table = getTable(global);
+ client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+ }
+
+ /**
+ * Is the function initialized?
+ */
+ @Override
+ public boolean isInitialized() {
+ return client != null;
+ }
+
+ /**
+ * Apply the function.
+ * @param args The function arguments.
+ * @param context
+ */
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+
+ String profile = getArg(0, String.class, args);
+ String entity = getArg(1, String.class, args);
+ long durationAgo = getArg(2, Long.class, args);
+ String unitsName = getArg(3, String.class, args);
+ TimeUnit units = TimeUnit.valueOf(unitsName);
+ List<Object> groups = getGroupsArg(4, args);
+
+ return client.fetch(profile, entity, durationAgo, units, Integer.class, groups);
+ }
+
+ /**
+ * Get the groups defined by the user.
+ *
+ * The user can specify 0 or more groups. All arguments from the specified position
+ * on are assumed to be groups. If there is no argument in the specified position,
+ * then it is assumed the user did not specify any groups.
+ *
+ * @param startIndex The starting index of groups within the function argument list.
+ * @param args The function arguments.
+ * @return The groups.
+ */
+ private List<Object> getGroupsArg(int startIndex, List<Object> args) {
+ List<Object> groups = new ArrayList<>();
+
+ for(int i=startIndex; i<args.size(); i++) {
+ String group = getArg(i, String.class, args);
+ groups.add(group);
+ }
+
+ return groups;
+ }
+
+ /**
+ * Ensure that the required capabilities are defined.
+ * @param context The context to validate.
+ * @param required The required capabilities.
+ * @throws IllegalStateException if all of the required capabilities are not present in the Context.
+ */
+ private void validateCapabilities(Context context, Context.Capabilities[] required) throws IllegalStateException {
+
+ // collect the name of each missing capability
+ String missing = Stream
+ .of(required)
+ .filter(c -> !context.getCapability(c).isPresent())
+ .map(c -> c.toString())
+ .collect(Collectors.joining(", "));
+
+ if(StringUtils.isNotBlank(missing) || context == null) {
+ throw new IllegalStateException("missing required context: " + missing);
+ }
+ }
+
+ /**
+ * Get an argument from a list of arguments.
+ * @param index The index within the list of arguments.
+ * @param clazz The type expected.
+ * @param args All of the arguments.
+ * @param <T> The type of the argument expected.
+ */
+ private <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+ if(index >= args.size()) {
+ throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
+ }
+
+ return ConversionUtils.convert(args.get(index), clazz);
+ }
+
+ /**
+ * Creates the ColumnBuilder to use in accessing the profile data.
+ * @param global The global configuration.
+ */
+ private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
+ // the builder is not currently configurable - but should be made so
+ ColumnBuilder columnBuilder;
+
+ if(global.containsKey(PROFILER_COLUMN_FAMILY)) {
+ String columnFamily = (String) global.get(PROFILER_COLUMN_FAMILY);
+ columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+ } else {
+ columnBuilder = new ValueOnlyColumnBuilder();
+ }
+
+ return columnBuilder;
+ }
+
+ /**
+ * Creates the ColumnBuilder to use in accessing the profile data.
+ * @param global The global configuration.
+ */
+ private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
+ // the builder is not currently configurable - but should be made so
+ return new SaltyRowKeyBuilder();
+ }
+
+ /**
+ * Create an HBase table used when accessing HBase.
+ * @param global The global configuration.
+ * @return
+ */
+ private HTableInterface getTable(Map<String, Object> global) {
+
+ String tableName = (String) global.getOrDefault(PROFILER_HBASE_TABLE, "profiler");
+ TableProvider provider = getTableProvider(global);
+
+ try {
+ return provider.getTable(HBaseConfiguration.create(), tableName);
+
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName));
+ }
+ }
+
+ /**
+ * Create the TableProvider to use when accessing HBase.
+ * @param global The global configuration.
+ */
+ private TableProvider getTableProvider(Map<String, Object> global) {
+ String clazzName = (String) global.getOrDefault(PROFILER_HBASE_TABLE_PROVIDER, HTableProvider.class.getName());
+
+ TableProvider provider;
+ try {
+ Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
+ provider = clazz.newInstance();
+
+ } catch (Exception e) {
+ provider = new HTableProvider();
+ }
+
+ return provider;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
new file mode 100644
index 0000000..e68d52d
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/GetProfileTest.java
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.FunctionResolverSingleton;
+import org.apache.metron.common.dsl.ParseException;
+import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
+import org.apache.metron.profiler.stellar.StellarExecutor;
+import org.apache.metron.test.mock.MockHTable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.GetProfile.PROFILER_HBASE_TABLE_PROVIDER;
+
+/**
+ * Tests the GetProfile class.
+ */
+public class GetProfileTest {
+
+ private static final String tableName = "profiler";
+ private static final String columnFamily = "P";
+ private StellarExecutor executor;
+ private Map<String, Object> state;
+ private ProfileWriter profileWriter;
+
+ /**
+ * A TableProvider that allows us to mock HBase.
+ */
+ public static class MockTableProvider implements TableProvider, Serializable {
+
+ MockHTable.Provider provider = new MockHTable.Provider();
+
+ @Override
+ public HTableInterface getTable(Configuration config, String tableName) throws IOException {
+ return provider.getTable(config, tableName);
+ }
+ }
+
+ private <T> T run(String expression, Class<T> clazz) {
+ return executor.execute(expression, state, clazz);
+ }
+
+ @Before
+ public void setup() {
+ state = new HashMap<>();
+ final HTableInterface table = MockHTable.Provider.addToCache(tableName, columnFamily);
+
+ // used to write values to be read during testing
+ RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+ ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+ profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
+
+ // global properties
+ Map<String, Object> global = new HashMap<String, Object>() {{
+ put(PROFILER_HBASE_TABLE, tableName);
+ put(PROFILER_COLUMN_FAMILY, columnFamily);
+ put(PROFILER_HBASE_TABLE_PROVIDER, MockTableProvider.class.getName());
+ }};
+
+ // create the necessary context
+ Context context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+ .build();
+
+ // initialize the executor with that context
+ executor = new DefaultStellarExecutor();
+ executor.setContext(context);
+
+ // force re-initialization before each test
+ FunctionResolverSingleton.getInstance().reset();
+ }
+
+ /**
+ * Values should be retrievable that have NOT been stored within a group.
+ */
+ @Test
+ public void testWithNoGroups() {
+ final int periodsPerHour = 4;
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Collections.emptyList();
+
+ // setup - write some measurements to be read later
+ final int count = hours * periodsPerHour;
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, count, group, val -> expectedValue);
+
+ // execute - read the profile values - no groups
+ String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS')";
+ List<Integer> result = run(expr, List.class);
+
+ // validate - expect to read all values from the past 4 hours
+ Assert.assertEquals(count, result.size());
+ }
+
+ /**
+ * Values should be retrievable that have been stored within a 'group'.
+ */
+ @Test
+ public void testWithOneGroup() {
+ final int periodsPerHour = 4;
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Arrays.asList("weekends");
+
+ // setup - write some measurements to be read later
+ final int count = hours * periodsPerHour;
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, count, group, val -> expectedValue);
+
+ // create a variable that contains the groups to use
+ state.put("groups", group);
+
+ // execute - read the profile values
+ String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', 'weekends')";
+ List<Integer> result = run(expr, List.class);
+
+ // validate - expect to read all values from the past 4 hours
+ Assert.assertEquals(count, result.size());
+ }
+
+ /**
+ * Values should be retrievable that have been stored within a 'group'.
+ */
+ @Test
+ public void testWithTwoGroups() {
+ final int periodsPerHour = 4;
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Arrays.asList("weekdays", "tuesday");
+
+ // setup - write some measurements to be read later
+ final int count = hours * periodsPerHour;
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, count, group, val -> expectedValue);
+
+ // create a variable that contains the groups to use
+ state.put("groups", group);
+
+ // execute - read the profile values
+ String expr = "PROFILE_GET('profile1', 'entity1', 4, 'HOURS', 'weekdays', 'tuesday')";
+ List<Integer> result = run(expr, List.class);
+
+ // validate - expect to read all values from the past 4 hours
+ Assert.assertEquals(count, result.size());
+ }
+
+ /**
+ * Initialization should fail if the required context values are missing.
+ */
+ @Test(expected = ParseException.class)
+ public void testMissingContext() {
+ Context empty = Context.EMPTY_CONTEXT();
+
+ // 'unset' the context that was created during setup()
+ executor.setContext(empty);
+
+ // force re-initialization with no context
+ FunctionResolverSingleton.getInstance().initialize(empty);
+
+ // validate - function should be unable to initialize
+ String expr = "PROFILE_GET('profile1', 'entity1', 1000, 'SECONDS', groups)";
+ run(expr, List.class);
+ }
+
+ /**
+ * If the time horizon specified does not include any profile measurements, then
+ * none should be returned.
+ */
+ @Test
+ public void testOutsideTimeHorizon() {
+ final int periodsPerHour = 4;
+ final int expectedValue = 2302;
+ final int hours = 2;
+ final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
+ final List<Object> group = Collections.emptyList();
+
+ // setup - write a single value from 2 hours ago
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, 1, group, val -> expectedValue);
+
+ // create a variable that contains the groups to use
+ state.put("groups", group);
+
+ // execute - read the profile values
+ String expr = "PROFILE_GET('profile1', 'entity1', 4, 'SECONDS')";
+ List<Integer> result = run(expr, List.class);
+
+ // validate - there should be no values from only 4 seconds ago
+ Assert.assertEquals(0, result.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index 7e13f3b..087bffa 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -23,20 +23,15 @@ package org.apache.metron.profiler.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.client.HBaseClient;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
import org.apache.metron.profiler.stellar.DefaultStellarExecutor;
import org.apache.metron.profiler.stellar.StellarExecutor;
-import org.apache.storm.hbase.common.ColumnList;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -52,10 +47,11 @@ import static org.junit.Assert.assertEquals;
/**
* Tests the HBaseProfilerClient.
*
- * The naming used in this test attempts to be as similar to how the 'groupBy' functionality might be used 'in
- * the wild'. This test involves reading and writing two separate groups originating from the same Profile and
- * Entity. There is a 'weekdays' group which contains all measurements taken on weekdays. There is also a
- * 'weekend' group which contains all measurements taken on weekends.
+ * The naming used in this test attempts to be as similar to how the 'groupBy'
+ * functionality might be used 'in the wild'. This test involves reading and
+ * writing two separate groups originating from the same Profile and Entity.
+ * There is a 'weekdays' group which contains all measurements taken on weekdays.
+ * There is also a 'weekend' group which contains all measurements taken on weekends.
*/
public class HBaseProfilerClientTest {
@@ -63,12 +59,10 @@ public class HBaseProfilerClientTest {
private static final String columnFamily = "P";
private HBaseProfilerClient client;
- private HBaseClient hbaseClient;
- private RowKeyBuilder rowKeyBuilder;
- private ColumnBuilder columnBuilder;
private HTableInterface table;
private StellarExecutor executor;
private static HBaseTestingUtility util;
+ private ProfileWriter profileWriter;
@BeforeClass
public static void startHBase() throws Exception {
@@ -88,70 +82,28 @@ public class HBaseProfilerClientTest {
@Before
public void setup() throws Exception {
- // setup all of the necessary dependencies
table = util.createTable(Bytes.toBytes(tableName), Bytes.toBytes(columnFamily));
- hbaseClient = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
executor = new DefaultStellarExecutor();
- rowKeyBuilder = new SaltyRowKeyBuilder();
- columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+ // used to write values to be read during testing
+ RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+ ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+ profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table);
// what we're actually testing
client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
}
- /**
- * Writes profile measurements that can be used for testing.
- * @param count The number of profile measurements to write.
- * @param profileName Name of the profile.
- * @param entityName Name of the entity.
- * @param value The measurement value.
- * @param periodsPerHour Number of profile periods per hour.
- * @param startTime When measurements should start to be written.
- * @param group The name of the group.
- */
- private void writeMeasurements(int count, String profileName, String entityName, int value, int periodsPerHour, long startTime, List<Object> group) {
-
- // create the first measurement
- ProfileMeasurement m = new ProfileMeasurement(profileName, entityName, startTime, periodsPerHour);
- m.setValue(value);
-
- for(int i=0; i<count; i++) {
-
- // create a measurement for the next profile period
- ProfilePeriod next = m.getPeriod().next();
- m = new ProfileMeasurement(profileName, entityName, next.getTimeInMillis(), periodsPerHour);
- m.setValue(value);
-
- // write the measurement
- write(m, group);
- }
- }
-
@After
public void tearDown() throws Exception {
util.deleteTable(tableName);
}
/**
- * Write a ProfileMeasurement.
- * @param m The ProfileMeasurement to write.
- * @param groups The groups to use when writing the ProfileMeasurement.
- */
- private void write(ProfileMeasurement m, List<Object> groups) {
-
- byte[] rowKey = rowKeyBuilder.rowKey(m, groups);
- ColumnList cols = columnBuilder.columns(m);
-
- List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, Durability.SKIP_WAL);
- hbaseClient.batchMutate(mutations);
- }
-
- /**
* The client should be able to distinguish between groups and only fetch those in the correct group.
*/
@Test
public void testFetchOneGroup() throws Exception {
-
final int periodsPerHour = 4;
final int expectedValue = 2302;
final int hours = 2;
@@ -159,8 +111,9 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
// setup - write two groups of measurements - 'weekends' and 'weekdays'
- writeMeasurements(count, "profile1", "entity1", expectedValue, periodsPerHour, startTime, Arrays.asList("weekdays"));
- writeMeasurements(count, "profile1", "entity1", 0, periodsPerHour, startTime, Arrays.asList("weekends"));
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
+ profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
// execute
List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, Arrays.asList("weekdays"));
@@ -184,11 +137,13 @@ public class HBaseProfilerClientTest {
final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
// create two groups of measurements - one on weekdays and one on weekends
- writeMeasurements(count, "profile1", "entity1", expectedValue, periodsPerHour, startTime, Arrays.asList("weekdays"));
- writeMeasurements(count, "profile1", "entity1", 0, periodsPerHour, startTime, Arrays.asList("weekends"));
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
+ profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
// execute
- List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, Arrays.asList("does-not-exist"));
+ List<Object> doesNotExist = Arrays.asList("does-not-exist");
+ List<Integer> results = client.fetch("profile1", "entity1", hours, TimeUnit.HOURS, Integer.class, doesNotExist);
// validate
assertEquals(0, results.size());
@@ -200,13 +155,15 @@ public class HBaseProfilerClientTest {
*/
@Test
public void testFetchOutsideTimeWindow() throws Exception {
-
- // setup - create some measurement values from a day ago
final int periodsPerHour = 4;
final int hours = 2;
+ int numberToWrite = hours * periodsPerHour;
final List<Object> group = Arrays.asList("weekends");
final long startTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
- writeMeasurements(hours * periodsPerHour, "profile1", "entity1", 1000, 4, startTime, group);
+
+ // setup - write some values to read later
+ ProfileMeasurement m = new ProfileMeasurement("profile1", "entity1", startTime, periodsPerHour);
+ profileWriter.write(m, numberToWrite, group, val -> 1000);
// execute
List<Integer> results = client.fetch("profile1", "entity1", 2, TimeUnit.MILLISECONDS, Integer.class, group);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
new file mode 100644
index 0000000..95be44c
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.storm.hbase.common.ColumnList;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Writes ProfileMeasurement values that can be read during automated testing.
+ */
+public class ProfileWriter {
+
+ private RowKeyBuilder rowKeyBuilder;
+ private ColumnBuilder columnBuilder;
+ private HBaseClient hbaseClient;
+ private HBaseProfilerClient client;
+
+ public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table) {
+ this.rowKeyBuilder = rowKeyBuilder;
+ this.columnBuilder = columnBuilder;
+ this.hbaseClient = new HBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
+ this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder);
+ }
+
+ /**
+ * Writes profile measurements that can be used for testing.
+ *
+ * @param prototype A prototype for the types of ProfileMeasurements that should be written.
+ * @param count The number of profile measurements to write.
+ * @param group The name of the group.
+ * @param valueGenerator A function that consumes the previous ProfileMeasurement value and produces the next.
+ */
+ public void write(ProfileMeasurement prototype, int count, List<Object> group, Function<Object, Object> valueGenerator) {
+
+ ProfileMeasurement m = prototype;
+ for(int i=0; i<count; i++) {
+
+ // create a measurement for the next profile period to be written
+ ProfilePeriod next = m.getPeriod().next();
+ m = new ProfileMeasurement(
+ prototype.getProfileName(),
+ prototype.getEntity(),
+ next.getTimeInMillis(),
+ prototype.getPeriodsPerHour());
+
+ // generate the next value that should be written
+ Object nextValue = valueGenerator.apply(m.getValue());
+ m.setValue(nextValue);
+
+ // write the measurement
+ write(m, group);
+ }
+ }
+
+ /**
+ * Write a ProfileMeasurement.
+ * @param m The ProfileMeasurement to write.
+ * @param groups The groups to use when writing the ProfileMeasurement.
+ */
+ private void write(ProfileMeasurement m, List<Object> groups) {
+
+ byte[] rowKey = rowKeyBuilder.rowKey(m, groups);
+ ColumnList cols = columnBuilder.columns(m);
+
+ List<Mutation> mutations = hbaseClient.constructMutationReq(rowKey, cols, Durability.SKIP_WAL);
+ hbaseClient.batchMutate(mutations);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
index bea0b34..0c94879 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/ProfileMeasurement.java
@@ -51,6 +51,11 @@ public class ProfileMeasurement {
private List<String> groupBy;
/**
+ * The number of profile periods per hour.
+ */
+ private int periodsPerHour;
+
+ /**
* The period in which the ProfileMeasurement was taken.
*/
private ProfilePeriod period;
@@ -64,6 +69,7 @@ public class ProfileMeasurement {
public ProfileMeasurement(String profileName, String entity, long epochMillis, int periodsPerHour) {
this.profileName = profileName;
this.entity = entity;
+ this.periodsPerHour = periodsPerHour;
this.period = new ProfilePeriod(epochMillis, periodsPerHour);
}
@@ -95,13 +101,17 @@ public class ProfileMeasurement {
this.groupBy = groupBy;
}
+ public int getPeriodsPerHour() {
+ return periodsPerHour;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ProfileMeasurement that = (ProfileMeasurement) o;
-
+ if (periodsPerHour != that.periodsPerHour) return false;
if (profileName != null ? !profileName.equals(that.profileName) : that.profileName != null) return false;
if (entity != null ? !entity.equals(that.entity) : that.entity != null) return false;
if (value != null ? !value.equals(that.value) : that.value != null) return false;
@@ -116,6 +126,7 @@ public class ProfileMeasurement {
result = 31 * result + (entity != null ? entity.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (groupBy != null ? groupBy.hashCode() : 0);
+ result = 31 * result + periodsPerHour;
result = 31 * result + (period != null ? period.hashCode() : 0);
return result;
}
@@ -127,6 +138,7 @@ public class ProfileMeasurement {
", entity='" + entity + '\'' +
", value=" + value +
", groupBy=" + groupBy +
+ ", periodsPerHour=" + periodsPerHour +
", period=" + period +
'}';
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
index 79c046a..2cd7182 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileBuilderBolt.java
@@ -112,6 +112,7 @@ public class ProfileBuilderBolt extends ConfiguredProfilerBolt {
protected void initializeStellar() {
Context context = new Context.Builder()
.with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
.build();
StellarFunctions.initialize(context);
executor.setContext(context);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
index d389410..5d97e4d 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/bolt/ProfileSplitterBolt.java
@@ -82,8 +82,9 @@ public class ProfileSplitterBolt extends ConfiguredProfilerBolt {
protected void initializeStellar() {
Context context = new Context.Builder()
- .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
- .build();
+ .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
+ .build();
StellarFunctions.initialize(context);
executor.setContext(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
index ae786f2..474ac73 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/DefaultStellarExecutor.java
@@ -21,6 +21,7 @@
package org.apache.metron.profiler.stellar;
import org.apache.metron.common.dsl.Context;
+import org.apache.metron.common.dsl.FunctionResolver;
import org.apache.metron.common.dsl.MapVariableResolver;
import org.apache.metron.common.dsl.ParseException;
import org.apache.metron.common.dsl.StellarFunctions;
@@ -44,7 +45,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
/**
* Provides additional context for initializing certain Stellar functions. For
- * example, references to find Zookeeper or HBase.
+ * example, references to Zookeeper or HBase.
*/
private Context context;
@@ -61,41 +62,50 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
this.state = new HashMap<>(initialState);
}
+ /**
+ * The current state of the Stellar execution environment.
+ */
@Override
public Map<String, Object> getState() {
return new HashMap<>(state);
}
/**
- * Execute an expression and assign the result to a variable.
+ * Execute an expression and assign the result to a variable. The variable is maintained
+ * in the context of this executor and is available to all subsequent expressions.
*
- * @param variable The variable name to assign to.
- * @param expression The expression to execute.
- * @param message The message that provides additional context for the expression.
+ * @param variable The name of the variable to assign to.
+ * @param expression The expression to execute.
+ * @param transientState Additional state available to the expression. This most often represents
+ * the values available to the expression from an individual message. The state
+ * maps a variable name to a variable's value.
*/
@Override
- public void assign(String variable, String expression, Map<String, Object> message) {
- Object result = execute(expression, message);
+ public void assign(String variable, String expression, Map<String, Object> transientState) {
+ Object result = execute(expression, transientState);
state.put(variable, result);
}
/**
- * Execute a Stellar expression and returns the result.
+ * Execute a Stellar expression and return the result. The internal state of the executor
+ * is not modified.
*
- * @param expr The expression to execute.
- * @param message The message that is accessible when Stellar is executed.
- * @param clazz The expected class of the expression's result.
- * @param <T> The expected class of the expression's result.
+ * @param expression The expression to execute.
+ * @param state Additional state available to the expression. This most often represents
+ * the values available to the expression from an individual message. The state
+ * maps a variable name to a variable's value.
+ * @param clazz The expected type of the expression's result.
+ * @param <T> The expected type of the expression's result.
*/
@Override
- public <T> T execute(String expr, Map<String, Object> message, Class<T> clazz) {
- Object resultObject = execute(expr, message);
+ public <T> T execute(String expression, Map<String, Object> state, Class<T> clazz) {
+ Object resultObject = execute(expression, state);
// perform type conversion, if necessary
T result = ConversionUtils.convert(resultObject, clazz);
- if(result == null) {
+ if (result == null) {
throw new IllegalArgumentException(String.format("Unexpected type: expected=%s, actual=%s, expression=%s",
- clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expr));
+ clazz.getSimpleName(), resultObject.getClass().getSimpleName(), expression));
}
return result;
@@ -109,6 +119,7 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
/**
* Sets the Context for the Stellar execution environment. This provides global data used
* to initialize Stellar functions.
+ *
* @param context The Stellar context.
*/
@Override
@@ -119,17 +130,15 @@ public class DefaultStellarExecutor implements StellarExecutor, Serializable {
/**
* Execute a Stellar expression.
*
- * @param expr The expression to execute.
- * @param msg The message that is accessible when Stellar is executed.
+ * @param expression The expression to execute.
+ * @param transientState Additional state available to the expression. This most often represents
+ * the values available to the expression from an individual message. The state
+ * maps a variable name to a variable's value.
*/
- private Object execute(String expr, Map<String, Object> msg) {
- try {
- VariableResolver resolver = new MapVariableResolver(state, msg);
- StellarProcessor processor = new StellarProcessor();
- return processor.parse(expr, resolver, StellarFunctions.FUNCTION_RESOLVER(), context);
-
- } catch (ParseException e) {
- throw new ParseException(String.format("Bad expression: expr=%s, msg=%s, state=%s", expr, msg, state));
- }
+ private Object execute(String expression, Map<String, Object> transientState) {
+ FunctionResolver functionResolver = StellarFunctions.FUNCTION_RESOLVER();
+ VariableResolver variableResolver = new MapVariableResolver(state, transientState);
+ StellarProcessor processor = new StellarProcessor();
+ return processor.parse(expression, variableResolver, functionResolver, context);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
index 66e3ad1..869db42 100644
--- a/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
+++ b/metron-analytics/metron-profiler/src/main/java/org/apache/metron/profiler/stellar/StellarExecutor.java
@@ -21,37 +21,38 @@
package org.apache.metron.profiler.stellar;
import org.apache.metron.common.dsl.Context;
-import org.json.simple.JSONObject;
import java.util.Map;
/**
* Executes Stellar expressions and maintains state across multiple invocations.
- *
- * There are two sets of functions in Stellar currently. One can be executed with
- * a PredicateProcessor and the other a TransformationProcessor. This interface
- * abstracts away that complication.
*/
public interface StellarExecutor {
/**
- * Execute an expression and assign the result to a variable.
+ * Execute an expression and assign the result to a variable. The variable is maintained
+ * in the context of this executor and is available to all subsequent expressions.
*
- * @param variable The name of the variable to assign to.
+ * @param variable The name of the variable to assign to.
* @param expression The expression to execute.
- * @param message The message that provides additional context for the expression.
+ * @param state Additional state available to the expression. This most often represents
+ * the values available to the expression from an individual message. The state
+ * maps a variable name to a variable's value.
*/
- void assign(String variable, String expression, Map<String, Object> message);
+ void assign(String variable, String expression, Map<String, Object> state);
/**
- * Execute a Stellar expression and return the result.
+ * Execute a Stellar expression and return the result. The internal state of the executor
+ * is not modified.
*
* @param expression The expression to execute.
- * @param message A map of values, most often the JSON message itself, that is accessible within Stellar.
- * @param clazz The expected class of the expression's result.
- * @param <T> The expected class of the expression's result.
+ * @param state Additional state available to the expression. This most often represents
+ * the values available to the expression from an individual message. The state
+ * maps a variable name to a variable's value.
+ * @param clazz The expected type of the expression's result.
+ * @param <T> The expected type of the expression's result.
*/
- <T> T execute(String expression, Map<String, Object> message, Class<T> clazz);
+ <T> T execute(String expression, Map<String, Object> state, Class<T> clazz);
/**
* The current state of the Stellar execution environment.
@@ -66,6 +67,7 @@ public interface StellarExecutor {
/**
* Sets the Context for the Stellar execution environment. This provides global data used
* to initialize Stellar functions.
+ *
* @param context The Stellar context.
*/
void setContext(Context context);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
index 7a5cbcf..9e2a66a 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/stellar/DefaultStellarExecutorTest.java
@@ -28,6 +28,11 @@ import org.json.simple.parser.ParseException;
import org.junit.Before;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
@@ -158,4 +163,18 @@ public class DefaultStellarExecutorTest {
executor.execute("2", message, Short.class);
executor.execute("2", message, Long.class);
}
+
+ /**
+ * The executor must be serializable.
+ */
+ @Test
+ public void testSerializable() throws Exception {
+
+ // serialize
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ new ObjectOutputStream(bytes).writeObject(executor);
+
+ // deserialize - success when no exceptions
+ new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
index 0731b00..aa8c9a2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/Context.java
@@ -31,7 +31,8 @@ public class Context implements Serializable {
public enum Capabilities {
HBASE_PROVIDER,
ZOOKEEPER_CLIENT,
- SERVICE_DISCOVERER
+ SERVICE_DISCOVERER,
+ GLOBAL_CONFIG
}
public static class Builder {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
index 0e90b84..03fdc5f 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolver.java
@@ -19,8 +19,31 @@ package org.apache.metron.common.dsl;
import java.util.function.Function;
+/**
+ * Responsible for function resolution in Stellar.
+ */
public interface FunctionResolver extends Function<String, StellarFunction> {
+
+ /**
+ * Provides metadata about each Stellar function that is resolvable.
+ */
Iterable<StellarFunctionInfo> getFunctionInfo();
+
+ /**
+ * The names of all Stellar functions that are resolvable.
+ */
Iterable<String> getFunctions();
+
+ /**
+ * Initialize the function resolver.
+ * @param context Context used to initialize.
+ */
void initialize(Context context);
+
+ /**
+ * A 'factory reset' of the function resolver.
+ *
+ * Useful primarily for testing purposes only.
+ */
+ void reset();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
index c631878..ed57db0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/FunctionResolverSingleton.java
@@ -43,18 +43,26 @@ public class FunctionResolverSingleton implements FunctionResolver {
return INSTANCE;
}
-
-
+ /**
+ * Provides metadata about each Stellar function that is resolvable.
+ */
@Override
public Iterable<StellarFunctionInfo> getFunctionInfo() {
return _getFunctions().values();
}
+ /**
+ * The names of all Stellar functions that are resolvable.
+ */
@Override
public Iterable<String> getFunctions() {
return _getFunctions().keySet();
}
+ /**
+ * Initialize the function resolver.
+ * @param context Context used to initialize.
+ */
@Override
public void initialize(Context context) {
//forces a load of the stellar functions.
@@ -62,6 +70,22 @@ public class FunctionResolverSingleton implements FunctionResolver {
}
/**
+ * A 'factory reset' of the function resolver.
+ *
+ * Useful primarily for testing purposes only.
+ */
+ @Override
+ public void reset() {
+ lock.writeLock().lock();
+ try {
+ isInitialized.set(false);
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
* This allows the lazy loading of the functions. We do not want to take a multi-second hit to analyze the full classpath
* every time a unit test starts up. That would cause the runtime of things to blow right up. Instead, we only want
* to take the hit if a function is actually called from a stellar expression.
@@ -148,8 +172,6 @@ public class FunctionResolverSingleton implements FunctionResolver {
return ClasspathHelper.forManifest(ClasspathHelper.forClassLoader(classLoaders));
}
-
-
private static Map.Entry<String, StellarFunctionInfo> create(Class<? extends StellarFunction> stellarClazz) {
String fqn = getNameFromAnnotation(stellarClazz);
if(fqn == null) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
index 8c9745f..e0065e8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/dsl/StellarFunctionInfo.java
@@ -20,12 +20,36 @@ package org.apache.metron.common.dsl;
import java.util.Arrays;
import java.util.List;
+/**
+ * Describes a Stellar function.
+ */
public class StellarFunctionInfo {
- String description;
+
+ /**
+ * The name of the function.
+ */
String name;
+
+ /**
+ * A description of the function. Used for documentation purposes.
+ */
+ String description;
+
+ /**
+ * A description of what the function returns. Used for documentation purposes.
+ */
+ String returns;
+
+ /**
+ * The function parameters. Used for documentation purposes.
+ */
String[] params;
+
+ /**
+ * The actual function that can be executed.
+ */
StellarFunction function;
- String returns;
+
public StellarFunctionInfo(String description, String name, String[] params, String returns, StellarFunction function) {
this.description = description;
this.name = name;
@@ -34,7 +58,9 @@ public class StellarFunctionInfo {
this.returns = returns;
}
- public String getReturns() { return returns;}
+ public String getReturns() {
+ return returns;
+ }
public String getDescription() {
return description;
@@ -65,7 +91,6 @@ public class StellarFunctionInfo {
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(getParams(), that.getParams())) return false;
return getReturns() != null ? getReturns().equals(that.getReturns()) : that.getReturns() == null;
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/ef3e9fa5/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
index d294da7..266badf 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
@@ -3,14 +3,20 @@
"es.ip": "localhost",
"es.port": 9300,
"es.date.format": "yyyy.MM.dd.HH",
+
"solr.zookeeper": "localhost:2181",
"solr.collection": "metron",
"solr.numShards": 1,
"solr.replicationFactor": 1,
+
"fieldValidations" : [
{
- "input" : [ "src_ip_addr", "dst_ip_addr"]
- ,"validation" : "IP"
+ "input" : [ "src_ip_addr", "dst_ip_addr"],
+ "validation" : "IP"
}
- ]
+ ],
+
+ "profiler.hbase.table": "profiler",
+ "profiler.column.family": "P",
+ "profiler.hbase.table.provider": "org.apache.metron.hbase.HTableProvider"
}
\ No newline at end of file