You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/08/08 20:40:49 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2177 Upgrade Profiler for HBase 2.0.2 (nickwallen) closes
apache/metron#1458
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new 1e1afc3 METRON-2177 Upgrade Profiler for HBase 2.0.2 (nickwallen) closes apache/metron#1458
1e1afc3 is described below
commit 1e1afc33332ac3b4618a8f19ffcb02d65ad4b369
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Thu Aug 8 16:40:26 2019 -0400
METRON-2177 Upgrade Profiler for HBase 2.0.2 (nickwallen) closes apache/metron#1458
---
.../profiler/client/HBaseProfilerClient.java | 117 +++--
.../client/HBaseProfilerClientFactory.java | 101 ++++
.../metron/profiler/client/ProfilerClient.java | 3 +-
.../profiler/client/ProfilerClientFactories.java | 45 ++
.../profiler/client/ProfilerClientFactory.java | 36 ++
.../metron/profiler/client/stellar/GetProfile.java | 255 +++++-----
.../client/stellar/ProfilerClientConfig.java | 8 +-
.../metron/profiler/client/stellar/Util.java | 2 +-
.../profiler/client/stellar/VerboseProfile.java | 165 +++----
.../client/HBaseProfilerClientFactoryTest.java | 108 +++++
.../profiler/client/HBaseProfilerClientTest.java | 287 ++++++------
.../metron/profiler/client/ProfileWriter.java | 125 -----
.../profiler/client/stellar/GetProfileTest.java | 512 ++++++---------------
.../client/stellar/VerboseProfileTest.java | 294 ++++++------
.../metron/profiler/hbase/SaltyRowKeyBuilder.java | 8 +
.../profiler/hbase/ValueOnlyColumnBuilder.java | 3 -
.../profiler/DefaultMessageDistributorTest.java | 2 -
metron-analytics/metron-profiler-spark/pom.xml | 6 +
.../metron/profiler/spark/BatchProfiler.java | 6 +-
.../metron/profiler/spark/BatchProfilerConfig.java | 7 +-
.../spark/function/HBaseWriterFunction.java | 159 ++++---
.../spark/BatchProfilerIntegrationTest.java | 44 +-
.../spark/function/HBaseWriterFunctionTest.java | 96 ++--
metron-analytics/metron-profiler-storm/pom.xml | 10 +
.../src/main/config/profiler.properties | 2 +
.../src/main/flux/profiler/remote.yaml | 9 +-
.../org/apache/metron/hbase/bolt/HBaseBolt.java | 101 ++--
.../apache/metron/hbase/bolt/HBaseBoltTest.java | 19 +-
.../java/org/apache/metron/hbase/bolt/Widget.java | 84 ++++
.../org/apache/metron/hbase/bolt/WidgetMapper.java | 71 +++
.../storm/integration/ProfilerIntegrationTest.java | 50 +-
.../package/templates/profiler.properties.j2 | 2 +
.../metron/hbase/client/HBaseTableClient.java | 2 +-
.../metron/stellar/dsl/StellarFunctionInfo.java | 6 +-
.../functions/resolver/BaseFunctionResolver.java | 18 +-
.../dsl/functions/resolver/FunctionResolver.java | 17 +-
.../functions/resolver/SimpleFunctionResolver.java | 13 +-
37 files changed, 1461 insertions(+), 1332 deletions(-)
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index 2e537da..b9d0384 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -20,11 +20,11 @@
package org.apache.metron.profiler.client;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.apache.metron.hbase.client.HBaseClient;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
@@ -44,7 +44,7 @@ public class HBaseProfilerClient implements ProfilerClient {
/**
* Used to access the profile data stored in HBase.
*/
- private HTableInterface table;
+ private HBaseClient hbaseClient;
/**
* Generates the row keys necessary to scan HBase.
@@ -52,19 +52,22 @@ public class HBaseProfilerClient implements ProfilerClient {
private RowKeyBuilder rowKeyBuilder;
/**
- * Knows how profiles are organized in HBase.
+ * Knows how profiles are organized by columns in HBase.
*/
private ColumnBuilder columnBuilder;
+ /**
+ * The period duration in milliseconds for the profiles that will be read by this client.
+ */
private long periodDurationMillis;
- public HBaseProfilerClient(HTableInterface table,
+ public HBaseProfilerClient(HBaseClient hbaseClient,
RowKeyBuilder rowKeyBuilder,
ColumnBuilder columnBuilder,
long periodDurationMillis) {
- setTable(table);
- setRowKeyBuilder(rowKeyBuilder);
- setColumnBuilder(columnBuilder);
+ this.rowKeyBuilder = rowKeyBuilder;
+ this.columnBuilder = columnBuilder;
+ this.hbaseClient = hbaseClient;
this.periodDurationMillis = periodDurationMillis;
}
@@ -82,7 +85,13 @@ public class HBaseProfilerClient implements ProfilerClient {
* @return A list of values.
*/
@Override
- public <T> List<ProfileMeasurement> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long start, long end, Optional<T> defaultValue) {
+ public <T> List<ProfileMeasurement> fetch(Class<T> clazz,
+ String profile,
+ String entity,
+ List<Object> groups,
+ long start,
+ long end,
+ Optional<T> defaultValue) {
List<ProfilePeriod> periods = ProfilePeriod.visitPeriods(
start,
end,
@@ -93,6 +102,8 @@ public class HBaseProfilerClient implements ProfilerClient {
return fetch(clazz, profile, entity, groups, periods, defaultValue);
}
+
+
/**
* Fetch the values stored in a profile based on a set of timestamps.
*
@@ -105,7 +116,12 @@ public class HBaseProfilerClient implements ProfilerClient {
* @return A list of values.
*/
@Override
- public <T> List<ProfileMeasurement> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue) {
+ public <T> List<ProfileMeasurement> fetch(Class<T> clazz,
+ String profile,
+ String entity,
+ List<Object> groups,
+ Iterable<ProfilePeriod> periods,
+ Optional<T> defaultValue) {
// create a list of profile measurements that need fetched
List<ProfileMeasurement> toFetch = new ArrayList<>();
for(ProfilePeriod period: periods) {
@@ -120,59 +136,76 @@ public class HBaseProfilerClient implements ProfilerClient {
return doFetch(toFetch, clazz, defaultValue);
}
- private <T> List<ProfileMeasurement> doFetch(List<ProfileMeasurement> measurements, Class<T> clazz, Optional<T> defaultValue) {
+ @Override
+ public void close() throws IOException {
+ if(hbaseClient != null) {
+ hbaseClient.close();
+ }
+ }
+
+ private <T> List<ProfileMeasurement> doFetch(List<ProfileMeasurement> measurements,
+ Class<T> clazz,
+ Optional<T> defaultValue) {
List<ProfileMeasurement> values = new ArrayList<>();
- // build the gets for HBase
+ // define which columns need fetched
byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
- List<Get> gets = new ArrayList<>();
+ HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, columnQualifier);
+ HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumn(column);
+
for(ProfileMeasurement measurement: measurements) {
byte[] rowKey = rowKeyBuilder.rowKey(measurement);
- Get get = new Get(rowKey).addColumn(columnFamily, columnQualifier);
- gets.add(get);
+ hbaseClient.addGet(rowKey, criteria);
}
// query HBase
- try {
- Result[] results = table.get(gets);
- for(int i = 0; i < results.length; ++i) {
- Result result = results[i];
- ProfileMeasurement measurement = measurements.get(i);
-
- boolean exists = result.containsColumn(columnFamily, columnQualifier);
- if(exists) {
- // value found
- byte[] value = result.getValue(columnFamily, columnQualifier);
- measurement.withProfileValue(SerDeUtils.fromBytes(value, clazz));
- values.add(measurement);
-
- } else if(defaultValue.isPresent()) {
- // no value found, use default value provided
- measurement.withProfileValue(defaultValue.get());
- values.add(measurement);
-
- } else {
- // no value found and no default provided. nothing to do
- }
+ Result[] results = hbaseClient.getAll();
+ for(int i = 0; i < results.length; ++i) {
+ Result result = results[i];
+ ProfileMeasurement measurement = measurements.get(i);
+
+ boolean exists = result.containsColumn(columnFamily, columnQualifier);
+ if(exists) {
+ // value found
+ byte[] value = result.getValue(columnFamily, columnQualifier);
+ measurement.withProfileValue(SerDeUtils.fromBytes(value, clazz));
+ values.add(measurement);
+
+ } else if(defaultValue.isPresent()) {
+ // no value found, use default value provided
+ measurement.withProfileValue(defaultValue.get());
+ values.add(measurement);
+
+ } else {
+ // no value found and no default provided. nothing to do
}
- } catch(IOException e) {
- throw new RuntimeException(e);
}
return values;
}
+ protected HBaseClient getHbaseClient() {
+ return hbaseClient;
+ }
- public void setTable(HTableInterface table) {
- this.table = table;
+ protected RowKeyBuilder getRowKeyBuilder() {
+ return rowKeyBuilder;
}
- public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
+ protected void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
this.rowKeyBuilder = rowKeyBuilder;
}
- public void setColumnBuilder(ColumnBuilder columnBuilder) {
+ protected ColumnBuilder getColumnBuilder() {
+ return columnBuilder;
+ }
+
+ protected void setColumnBuilder(ColumnBuilder columnBuilder) {
this.columnBuilder = columnBuilder;
}
+
+ protected long getPeriodDurationMillis() {
+ return periodDurationMillis;
+ }
}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClientFactory.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClientFactory.java
new file mode 100644
index 0000000..9d83100
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClientFactory.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_CONNECTION_FACTORY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
+
+/**
+ * Creates an {@link HBaseProfilerClient}.
+ */
+public class HBaseProfilerClientFactory implements ProfilerClientFactory {
+
+ /**
+ * The factory that provides the {@link HBaseClient} that is used to interact with HBase.
+ */
+ private HBaseClientFactory hBaseClientFactory;
+
+ public HBaseProfilerClientFactory() {
+ this(new HBaseTableClientFactory());
+ }
+
+ public HBaseProfilerClientFactory(HBaseClientFactory hBaseClientFactory) {
+ this.hBaseClientFactory = hBaseClientFactory;
+ }
+
+ @Override
+ public HBaseProfilerClient create(Map<String, Object> globals) {
+ // create the hbase client
+ String tableName = PROFILER_HBASE_TABLE.get(globals, String.class);
+ HBaseConnectionFactory connFactory = getConnectionFactory(globals);
+ Configuration config = HBaseConfiguration.create();
+ HBaseClient hbaseClient = hBaseClientFactory.create(connFactory, config, tableName);
+
+ // create the profiler client
+ RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(globals);
+ ColumnBuilder columnBuilder = getColumnBuilder(globals);
+ long periodDuration = getPeriodDurationInMillis(globals);
+ return new HBaseProfilerClient(hbaseClient, rowKeyBuilder, columnBuilder, periodDuration);
+ }
+
+ /**
+ * Creates the ColumnBuilder to use in accessing the profile data.
+ * @param global The global configuration.
+ */
+ private static ColumnBuilder getColumnBuilder(Map<String, Object> global) {
+ String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
+ return new ValueOnlyColumnBuilder(columnFamily);
+ }
+
+ /**
+ * Creates the ColumnBuilder to use in accessing the profile data.
+ * @param global The global configuration.
+ */
+ private static RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
+ Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
+ return new SaltyRowKeyBuilder(saltDivisor, getPeriodDurationInMillis(global), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Create the {@link HBaseConnectionFactory} to use when accessing HBase.
+ * @param global The global configuration.
+ */
+ private static HBaseConnectionFactory getConnectionFactory(Map<String, Object> global) {
+ String clazzName = PROFILER_HBASE_CONNECTION_FACTORY.get(global, String.class);
+ return HBaseConnectionFactory.byName(clazzName);
+ }
+}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
index 161575f..7283d21 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
@@ -23,13 +23,14 @@ package org.apache.metron.profiler.client;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
+import java.io.Closeable;
import java.util.List;
import java.util.Optional;
/**
* An interface for a client capable of retrieving the profile data that has been persisted by the Profiler.
*/
-public interface ProfilerClient {
+public interface ProfilerClient extends Closeable {
/**
* Fetch the values stored in a profile based on a start and end timestamp.
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactories.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactories.java
new file mode 100644
index 0000000..5fb4145
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactories.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import java.util.Map;
+
+/**
+ * Enumerates the available {@link ProfilerClientFactory} implementations.
+ */
+public enum ProfilerClientFactories implements ProfilerClientFactory {
+
+ /**
+ * The default factory that returns a {@link ProfilerClient} that interacts
+ * with profiles stored in HBase.
+ */
+ DEFAULT(new HBaseProfilerClientFactory());
+
+ private ProfilerClientFactory factory;
+
+ ProfilerClientFactories(ProfilerClientFactory factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public ProfilerClient create(Map<String, Object> globals) {
+ return factory.create(globals);
+ }
+}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactory.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactory.java
new file mode 100644
index 0000000..7f659a7
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactory.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import java.util.Map;
+
+/**
+ * Responsible for creating a {@link ProfilerClient}.
+ */
+public interface ProfilerClientFactory {
+
+ /**
+ * Create a {@link ProfilerClient}.
+ *
+ * @param globals The global configuration.
+ * @return The {@link ProfilerClient}.
+ */
+ ProfilerClient create(Map<String, Object> globals);
+}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index a0e2bdf..5a45571 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -20,18 +20,12 @@
package org.apache.metron.profiler.client.stellar;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.HBaseProfilerClientFactory;
import org.apache.metron.profiler.client.ProfilerClient;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClientFactories;
+import org.apache.metron.profiler.client.ProfilerClientFactory;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.Stellar;
@@ -42,21 +36,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_DEFAULT_VALUE;
import static org.apache.metron.profiler.client.stellar.Util.getArg;
import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
-import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
/**
* A Stellar function that can retrieve data contained within a Profile.
@@ -91,105 +80,135 @@ import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationIn
name="GET",
description="Retrieves a series of values from a stored profile.",
params={
- "profile - The name of the profile.",
- "entity - The name of the entity.",
- "periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
- "groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' " +
- "list used during profile creation. Defaults to an empty list, meaning no groups.",
- "config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
- "of the same name. Default is the empty Map, meaning no overrides."
+ "profile - The name of the profile.",
+ "entity - The name of the entity.",
+ "periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
+ "groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' " +
+ "list used during profile creation. Defaults to an empty list, meaning no groups.",
+ "config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
+ "of the same name. Default is the empty Map, meaning no overrides."
},
returns="The selected profile measurements."
)
public class GetProfile implements StellarFunction {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final int PROFILE_ARG_INDEX = 0;
+ private static final int ENTITY_ARG_INDEX = 1;
+ private static final int PERIOD_ARG_INDEX = 2;
+ private static final int GROUPS_ARG_INDEX = 3;
+ private static final int CONFIG_OVERRIDES_ARG_INDEX = 4;
/**
- * Cached client that can retrieve profile values.
+ * Allows the function to retrieve persisted {@link ProfileMeasurement} values.
*/
- private ProfilerClient client;
+ private ProfilerClient profilerClient;
/**
- * Cached value of config map actually used to construct the previously cached client.
+ * Creates the {@link ProfilerClient} used by this function.
*/
- private Map<String, Object> cachedConfigMap = new HashMap<String, Object>(6);
+ private ProfilerClientFactory profilerClientFactory;
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ /**
+ * Last known global configuration used to create the {@link ProfilerClient}. If the
+ * global configuration changes, a new {@link ProfilerClient} needs to be constructed.
+ */
+ private Map<String, Object> lastKnownGlobals = new HashMap<>();
/**
- * Initialization. No longer need to do anything in initialization,
- * as all setup is done lazily and cached.
+ * The default constructor used during Stellar function resolution.
*/
- @Override
- public void initialize(Context context) {
+ public GetProfile() {
+ this(ProfilerClientFactories.DEFAULT);
}
/**
- * Is the function initialized?
+ * The constructor used for testing.
+ * @param profilerClientFactory
*/
+ public GetProfile(ProfilerClientFactory profilerClientFactory) {
+ this.profilerClientFactory = profilerClientFactory;
+ }
+
@Override
- public boolean isInitialized() {
- return true;
+ public void initialize(Context context) {
+ Map<String, Object> globals = getGlobals(context);
+ profilerClient = profilerClientFactory.create(globals);
}
- /**
- * Apply the function.
- * @param args The function arguments.
- * @param context
- */
@Override
- public Object apply(List<Object> args, Context context) throws ParseException {
+ public boolean isInitialized() {
+ return profilerClient != null;
+ }
- String profile = getArg(0, String.class, args);
- String entity = getArg(1, String.class, args);
- Optional<List<ProfilePeriod>> periods = Optional.ofNullable(getArg(2, List.class, args));
- //Optional arguments
- @SuppressWarnings("unchecked")
- List<Object> groups = null;
- Map configOverridesMap = null;
- if (args.size() < 4) {
- // no optional args, so default 'groups' and configOverridesMap remains null.
- groups = new ArrayList<>(0);
- }
- else if (args.get(3) instanceof List) {
- // correct extensible usage
- groups = getArg(3, List.class, args);
- if (args.size() >= 5) {
- configOverridesMap = getArg(4, Map.class, args);
- if (configOverridesMap.isEmpty()) configOverridesMap = null;
- }
- }
- else {
- // Deprecated "varargs" style usage for groups_list
- // configOverridesMap cannot be specified so it remains null.
- groups = getGroupsArg(3, args);
+ @Override
+ public void close() throws IOException {
+ if(profilerClient != null) {
+ profilerClient.close();
}
+ }
- Map<String, Object> effectiveConfig = getEffectiveConfig(context, configOverridesMap);
- Object defaultValue = null;
- //lazily create new profiler client if needed
- if (client == null || !cachedConfigMap.equals(effectiveConfig)) {
- RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(effectiveConfig);
- ColumnBuilder columnBuilder = getColumnBuilder(effectiveConfig);
- HTableInterface table = getTable(effectiveConfig);
- long periodDuration = getPeriodDurationInMillis(effectiveConfig);
- client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDuration);
- cachedConfigMap = effectiveConfig;
- }
- if(cachedConfigMap != null) {
- defaultValue = ProfilerClientConfig.PROFILER_DEFAULT_VALUE.get(cachedConfigMap);
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ // required arguments
+ String profile = getArg(PROFILE_ARG_INDEX, String.class, args);
+ String entity = getArg(ENTITY_ARG_INDEX, String.class, args);
+ List<ProfilePeriod> periods = getArg(PERIOD_ARG_INDEX, List.class, args);
+
+ // optional arguments
+ List<Object> groups = getGroups(args);
+ Map<String, Object> overrides = getOverrides(args);
+
+ // lazily create new profiler client if needed
+ Map<String, Object> effectiveConfig = getEffectiveConfig(context, overrides);
+ if (profilerClient == null || !lastKnownGlobals.equals(effectiveConfig)) {
+ profilerClient = profilerClientFactory.create(effectiveConfig);
+ lastKnownGlobals = effectiveConfig;
}
- List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups,
- periods.orElse(new ArrayList<>(0)), Optional.ofNullable(defaultValue));
+ // is there a default value?
+ Optional<Object> defaultValue = Optional.empty();
+ if(effectiveConfig != null) {
+ defaultValue = Optional.ofNullable(PROFILER_DEFAULT_VALUE.get(effectiveConfig));
+ }
// return only the value of each profile measurement
+ List<ProfileMeasurement> measurements = profilerClient.fetch(Object.class, profile, entity, groups, periods, defaultValue);
List<Object> values = new ArrayList<>();
for(ProfileMeasurement m: measurements) {
values.add(m.getProfileValue());
}
+
return values;
}
+ private Map<String, Object> getOverrides(List<Object> args) {
+ Map<String, Object> configOverridesMap = null;
+ if(args.size() > CONFIG_OVERRIDES_ARG_INDEX && args.get(GROUPS_ARG_INDEX) instanceof List) {
+ configOverridesMap = getArg(CONFIG_OVERRIDES_ARG_INDEX, Map.class, args);
+ if (configOverridesMap.isEmpty()) {
+ configOverridesMap = null;
+ }
+ }
+ return configOverridesMap;
+ }
+
+ private List<Object> getGroups(List<Object> args) {
+ List<Object> groups;
+ if (args.size() < CONFIG_OVERRIDES_ARG_INDEX) {
+ // no optional args, so default 'groups' and configOverridesMap remains null.
+ groups = new ArrayList<>(0);
+
+ } else if (args.get(GROUPS_ARG_INDEX) instanceof List) {
+ // correct extensible usage
+ groups = getArg(GROUPS_ARG_INDEX, List.class, args);
+
+ } else {
+ // deprecated "varargs" style usage for groups_list
+ groups = getVarArgGroups(GROUPS_ARG_INDEX, args);
+ }
+ return groups;
+ }
+
/**
* Get the groups defined by the user.
*
@@ -201,7 +220,7 @@ public class GetProfile implements StellarFunction {
* @param args The function arguments.
* @return The groups.
*/
- private List<Object> getGroupsArg(int startIndex, List<Object> args) {
+ private static List<Object> getVarArgGroups(int startIndex, List<Object> args) {
List<Object> groups = new ArrayList<>();
for(int i=startIndex; i<args.size(); i++) {
@@ -212,76 +231,8 @@ public class GetProfile implements StellarFunction {
return groups;
}
- /**
- * Creates the ColumnBuilder to use in accessing the profile data.
- * @param global The global configuration.
- */
- private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
- ColumnBuilder columnBuilder;
-
- String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
- columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-
- return columnBuilder;
- }
-
- /**
- * Creates the ColumnBuilder to use in accessing the profile data.
- * @param global The global configuration.
- */
- private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
-
- // how long is the profile period?
- long duration = PROFILER_PERIOD.get(global, Long.class);
- LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
-
- // which units are used to define the profile period?
- String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
- TimeUnit units = TimeUnit.valueOf(configuredUnits);
- LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
-
- // what is the salt divisor?
- Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
- LOG.debug("profiler client: {}={}", PROFILER_SALT_DIVISOR, saltDivisor);
-
- return new SaltyRowKeyBuilder(saltDivisor, duration, units);
- }
-
- /**
- * Create an HBase table used when accessing HBase.
- * @param global The global configuration.
- * @return
- */
- private HTableInterface getTable(Map<String, Object> global) {
-
- String tableName = PROFILER_HBASE_TABLE.get(global, String.class);
- TableProvider provider = getTableProvider(global);
-
- try {
- return provider.getTable(HBaseConfiguration.create(), tableName);
-
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName), e);
- }
- }
-
- /**
- * Create the TableProvider to use when accessing HBase.
- * @param global The global configuration.
- */
- private TableProvider getTableProvider(Map<String, Object> global) {
- String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
-
- TableProvider provider;
- try {
- @SuppressWarnings("unchecked")
- Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
- provider = clazz.getConstructor().newInstance();
-
- } catch (Exception e) {
- provider = new HTableProvider();
- }
-
- return provider;
+ private static Map<String, Object> getGlobals(Context context) {
+ return (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
+ .orElse(Collections.emptyMap());
}
}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
index 1715b23..08ee064 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
@@ -20,8 +20,9 @@
package org.apache.metron.profiler.client.stellar;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
import java.util.Map;
@@ -42,6 +43,11 @@ public enum ProfilerClientConfig {
PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", HTableProvider.class.getName(), String.class),
/**
+ * A global property that defines the name of the {@link HBaseConnectionFactory} implementation class.
+ */
+ PROFILER_HBASE_CONNECTION_FACTORY("hbase.connection.factory", HBaseConnectionFactory.class.getName(), String.class),
+
+ /**
* A global property that defines the duration of each profile period. This value
* should be defined along with 'profiler.client.period.duration.units'.
*/
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
index ea85c56..304cbe3 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
@@ -75,7 +75,7 @@ public class Util {
* @return effective config Map with overrides applied.
* @throws ParseException - if any override values are of wrong type.
*/
- public static Map<String, Object> getEffectiveConfig(Context context , Map configOverridesMap ) throws ParseException {
+ public static Map<String, Object> getEffectiveConfig(Context context, Map configOverridesMap) throws ParseException {
// ensure the required capabilities are defined
final Context.Capabilities[] required = { GLOBAL_CONFIG };
validateCapabilities(context, required);
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
index 9e857aa..31106da 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
@@ -18,18 +18,12 @@
package org.apache.metron.profiler.client.stellar;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.HBaseProfilerClientFactory;
import org.apache.metron.profiler.client.ProfilerClient;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClientFactories;
+import org.apache.metron.profiler.client.ProfilerClientFactory;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.Stellar;
@@ -45,15 +39,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_DEFAULT_VALUE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
import static org.apache.metron.profiler.client.stellar.Util.getArg;
-import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
/**
@@ -88,69 +76,101 @@ import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
returns="A map for each profile measurement containing the profile name, entity, period, and value."
)
public class VerboseProfile implements StellarFunction {
-
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- protected static String PROFILE_KEY = "profile";
- protected static String ENTITY_KEY = "entity";
- protected static String PERIOD_KEY = "period";
- protected static String PERIOD_START_KEY = "period.start";
- protected static String PERIOD_END_KEY = "period.end";
- protected static String VALUE_KEY = "value";
- protected static String GROUPS_KEY = "groups";
- private ProfilerClient client;
+ protected static final String PROFILE_KEY = "profile";
+ protected static final String ENTITY_KEY = "entity";
+ protected static final String PERIOD_KEY = "period";
+ protected static final String PERIOD_START_KEY = "period.start";
+ protected static final String PERIOD_END_KEY = "period.end";
+ protected static final String VALUE_KEY = "value";
+ protected static final String GROUPS_KEY = "groups";
+ private static final int PROFILE_ARG_INDEX = 0;
+ private static final int ENTITY_ARG_INDEX = 1;
+ private static final int PERIOD_ARG_INDEX = 2;
+ private static final int GROUPS_ARG_INDEX = 3;
+
+ /**
+ * The default constructor used during Stellar function resolution.
+ */
+ public VerboseProfile() {
+ this(ProfilerClientFactories.DEFAULT);
+ }
+
+ /**
+ * The constructor used for testing.
+ * @param profilerClientFactory
+ */
+ public VerboseProfile(ProfilerClientFactory profilerClientFactory) {
+ this.profilerClientFactory = profilerClientFactory;
+ }
+
+ /**
+ * Allows the function to retrieve persisted {@link ProfileMeasurement} values.
+ */
+ private ProfilerClient profilerClient;
+
+ /**
+ * Creates the {@link ProfilerClient} used by this function.
+ */
+ private ProfilerClientFactory profilerClientFactory;
@Override
public void initialize(Context context) {
- // nothing to do
+ // values stored in the global config that are used to initialize the ProfilerClient
+ // are read only once during initialization. if those values change during a Stellar
+ // session, this function will not respond to them. the Stellar session would need to be
+ // restarted for those changes to take effect. this differs from the behavior of `PROFILE_GET`.
+ Map<String, Object> globals = getGlobals(context);
+ profilerClient = profilerClientFactory.create(globals);
}
@Override
public boolean isInitialized() {
- return true;
+ return profilerClient != null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(profilerClient != null) {
+ profilerClient.close();
+ }
}
@Override
public Object apply(List<Object> args, Context context) throws ParseException {
// required arguments
- String profile = getArg(0, String.class, args);
- String entity = getArg(1, String.class, args);
- List<ProfilePeriod> periods = getArg(2, List.class, args);
+ String profile = getArg(PROFILE_ARG_INDEX, String.class, args);
+ String entity = getArg(ENTITY_ARG_INDEX, String.class, args);
+ List<ProfilePeriod> periods = getArg(PERIOD_ARG_INDEX, List.class, args);
// optional 'groups' argument
List<Object> groups = new ArrayList<>();
- if(args.size() >= 4) {
- groups = getArg(3, List.class, args);
- }
-
- // get globals from the context
- Map<String, Object> globals = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
- .orElse(Collections.emptyMap());
-
- // lazily create the profiler client, if needed
- if (client == null) {
- RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(globals);
- ColumnBuilder columnBuilder = getColumnBuilder(globals);
- HTableInterface table = getTable(globals);
- long periodDuration = getPeriodDurationInMillis(globals);
- client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDuration);
+ if(args.size() > GROUPS_ARG_INDEX) {
+ groups = getArg(GROUPS_ARG_INDEX, List.class, args);
}
// is there a default value?
Optional<Object> defaultValue = Optional.empty();
+ Map<String, Object> globals = getGlobals(context);
if(globals != null) {
defaultValue = Optional.ofNullable(PROFILER_DEFAULT_VALUE.get(globals));
}
- List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups, periods, defaultValue);
-
// render a view of each profile measurement
+ List<ProfileMeasurement> measurements = profilerClient.fetch(Object.class, profile, entity, groups, periods, defaultValue);
List<Object> results = new ArrayList<>();
for(ProfileMeasurement measurement: measurements) {
results.add(render(measurement));
}
+
return results;
}
+ private static Map<String, Object> getGlobals(Context context) {
+ return (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
+ .orElse(Collections.emptyMap());
+ }
+
/**
* Renders a view of the profile measurement.
* @param measurement The profile measurement to render.
@@ -166,57 +186,4 @@ public class VerboseProfile implements StellarFunction {
view.put(GROUPS_KEY, measurement.getGroups());
return view;
}
-
- /**
- * Creates the ColumnBuilder to use in accessing the profile data.
- * @param global The global configuration.
- */
- private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
- String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
- return new ValueOnlyColumnBuilder(columnFamily);
- }
-
- /**
- * Creates the ColumnBuilder to use in accessing the profile data.
- * @param global The global configuration.
- */
- private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
- Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
- return new SaltyRowKeyBuilder(saltDivisor, getPeriodDurationInMillis(global), TimeUnit.MILLISECONDS);
- }
-
- /**
- * Create an HBase table used when accessing HBase.
- * @param global The global configuration.
- * @return
- */
- private HTableInterface getTable(Map<String, Object> global) {
- String tableName = PROFILER_HBASE_TABLE.get(global, String.class);
- TableProvider provider = getTableProvider(global);
- try {
- return provider.getTable(HBaseConfiguration.create(), tableName);
-
- } catch (IOException e) {
- throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName), e);
- }
- }
-
- /**
- * Create the TableProvider to use when accessing HBase.
- * @param global The global configuration.
- */
- private TableProvider getTableProvider(Map<String, Object> global) {
- String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
- TableProvider provider;
- try {
- @SuppressWarnings("unchecked")
- Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
- provider = clazz.getConstructor().newInstance();
-
- } catch (Exception e) {
- provider = new HTableProvider();
- }
-
- return provider;
- }
}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientFactoryTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientFactoryTest.java
new file mode 100644
index 0000000..8465256
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientFactoryTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_CONNECTION_FACTORY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link HBaseProfilerClientFactory}.
+ */
+public class HBaseProfilerClientFactoryTest {
+
+ private static final String tableName = "table";
+ private static final String columnFamily = "columnFamily";
+ private static final Integer periodDuration = 23;
+ private static final TimeUnit periodDurationUnits = TimeUnit.MINUTES;
+ private static final Integer saltDivisor = 1000;
+ private static final long periodDurationMillis = periodDurationUnits.toMillis(23);
+ private HBaseProfilerClientFactory factory;
+
+ @Before
+ public void setup() {
+ factory = new HBaseProfilerClientFactory();
+ }
+
+ @Test
+ public void testCreate() {
+ Map<String, Object> globals = new HashMap<String, Object>() {{
+ put(PROFILER_HBASE_TABLE.getKey(), tableName);
+ put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+ put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor.toString());
+ put(PROFILER_HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
+ put(PROFILER_PERIOD.getKey(), periodDuration.toString());
+ put(PROFILER_PERIOD_UNITS.getKey(), periodDurationUnits.toString());
+ }};
+
+ HBaseProfilerClient client = factory.create(globals);
+ assertEquals(periodDurationMillis, client.getPeriodDurationMillis());
+
+ // validate the row key builder that is created
+ SaltyRowKeyBuilder rowKeyBuilder = (SaltyRowKeyBuilder) client.getRowKeyBuilder();
+ assertEquals(saltDivisor, (Integer) rowKeyBuilder.getSaltDivisor());
+ assertEquals(periodDurationMillis, rowKeyBuilder.getPeriodDurationMillis());
+
+ // validate the column builder that is created
+ ValueOnlyColumnBuilder columnBuilder = (ValueOnlyColumnBuilder) client.getColumnBuilder();
+ assertEquals(columnFamily, columnBuilder.getColumnFamily());
+ }
+
+ @Test
+ public void testCreateUsingDefaultValues() {
+ Map<String, Object> globals = new HashMap<String, Object>() {{
+ // without using a mock connection factory, the test will hang trying to connect to HBase
+ put(PROFILER_HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
+ }};
+
+ // find what the default values should be
+ final long defaultPeriodDuration = (Long) PROFILER_PERIOD.getDefault();
+ final TimeUnit defaultPeriodDurationUnits = TimeUnit.valueOf((String) PROFILER_PERIOD_UNITS.getDefault());
+ final long defaultPeriodDurationMillis = defaultPeriodDurationUnits.toMillis(defaultPeriodDuration);
+ final long defaultSaltDivisor = (Long) PROFILER_SALT_DIVISOR.getDefault();
+ final String defaultColumnFamily = (String) PROFILER_COLUMN_FAMILY.getDefault();
+
+ HBaseProfilerClient client = factory.create(globals);
+ assertEquals(defaultPeriodDurationMillis, client.getPeriodDurationMillis());
+
+ // validate the row key builder that is created
+ SaltyRowKeyBuilder rowKeyBuilder = (SaltyRowKeyBuilder) client.getRowKeyBuilder();
+ assertEquals(defaultSaltDivisor, rowKeyBuilder.getSaltDivisor());
+ assertEquals(defaultPeriodDurationMillis, rowKeyBuilder.getPeriodDurationMillis());
+
+ // validate the column builder that is created
+ ValueOnlyColumnBuilder columnBuilder = (ValueOnlyColumnBuilder) client.getColumnBuilder();
+ assertEquals(defaultColumnFamily, columnBuilder.getColumnFamily());
+ }
+}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index cc3748e..f2dce99 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -20,184 +20,187 @@
package org.apache.metron.profiler.client;
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseClient;
import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
- * Tests the HBaseProfilerClient.
- *
- * The naming used in this test attempts to be as similar to how the 'groupBy'
- * functionality might be used 'in the wild'. This test involves reading and
- * writing two separate groups originating from the same Profile and Entity.
- * There is a 'weekdays' group which contains all measurements taken on weekdays.
- * There is also a 'weekend' group which contains all measurements taken on weekends.
+ * Tests the {@link HBaseProfilerClient}.
*/
public class HBaseProfilerClientTest {
-
private static final String tableName = "profiler";
private static final String columnFamily = "P";
+ private static final byte[] columnFamilyB = Bytes.toBytes(columnFamily);
+ private static final byte[] columnQualifier = Bytes.toBytes("column");
private static final long periodDuration = 15;
private static final TimeUnit periodUnits = TimeUnit.MINUTES;
private static final int periodsPerHour = 4;
-
- private HBaseProfilerClient client;
- private StellarStatefulExecutor executor;
- private MockHTable table;
- private ProfileWriter profileWriter;
+ private static final byte[] expectedRowKey = Bytes.toBytes("some-row-key");
+ private static final String profileName = "profile1";
+ private static final String entityName = "entity1";
+ private static final int profileValue = 1231121;
+ private static final byte[] profileValueB = SerDeUtils.toBytes(profileValue);
+ private long periodDurationMillis = periodUnits.toMillis(periodDuration);
+
+ private HBaseClient hbaseClient;
+ private HBaseProfilerClient profilerClient;
+ private ProfileMeasurement expected;
+ private RowKeyBuilder rowKeyBuilder;
+ private ColumnBuilder columnBuilder;
+ private Result expectedResult;
+ private Result emptyResult;
@Before
- public void setup() throws Exception {
- table = new MockHTable(tableName, columnFamily);
- executor = new DefaultStellarStatefulExecutor();
-
- // writes values to be read during testing
- long periodDurationMillis = periodUnits.toMillis(periodDuration);
- RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
- ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
- profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
-
- client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
+ public void setup() {
+ // create a profile measurement used in the tests
+ expected = new ProfileMeasurement()
+ .withProfileName(profileName)
+ .withEntity(entityName)
+ .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+ .withProfileValue(profileValue);
+
+ // mock row key builder needs to return a row key for the profile measurement used in the tests
+ rowKeyBuilder = mock(RowKeyBuilder.class);
+ when(rowKeyBuilder.rowKey(any())).thenReturn(expectedRowKey);
+
+ // mock column builder - column family/qualifier comes from the column builder
+ columnBuilder = mock(ColumnBuilder.class);
+ when(columnBuilder.getColumnFamily()).thenReturn(columnFamily);
+ when(columnBuilder.getColumnQualifier(eq("value"))).thenReturn(columnQualifier);
+
+ // this mock is used to feed data to the profiler client while testing
+ hbaseClient = mock(HBaseTableClient.class);
+
+ // a result that matches the expected profile measurement that can be return by the mock hbase client
+ expectedResult = mock(Result.class);
+ when(expectedResult.containsColumn(eq(columnFamilyB), eq(columnQualifier))).thenReturn(true);
+ when(expectedResult.getValue(eq(columnFamilyB), eq(columnQualifier))).thenReturn(profileValueB);
+
+ // an empty result to use in the tests
+ emptyResult = mock(Result.class);
+ when(emptyResult.containsColumn(any(), any())).thenReturn(false);
+
+ // create the profiler client that will be tested
+ profilerClient = new HBaseProfilerClient(hbaseClient, rowKeyBuilder, columnBuilder, periodDurationMillis);
}
- @After
- public void tearDown() throws Exception {
- table.clear();
+ @Test
+ public void shouldFetchProfileMeasurement() {
+ // need the hbase client to return a Result matching the expected profile measurement value
+ Result[] results = new Result[] { expectedResult };
+ when(hbaseClient.getAll()).thenReturn(results);
+
+ List<ProfileMeasurement> measurements = profilerClient.fetch(
+ Object.class,
+ expected.getProfileName(),
+ expected.getEntity(),
+ expected.getGroups(),
+ Arrays.asList(expected.getPeriod()),
+ Optional.empty());
+ assertEquals(1, measurements.size());
+ assertEquals(expected, measurements.get(0));
}
@Test
- public void Should_ReturnMeasurements_When_DataExistsForAGroup() throws Exception {
- final String profile = "profile1";
- final String entity = "entity1";
- final int expectedValue = 2302;
- final int hours = 2;
- final int count = hours * periodsPerHour + 1;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-
- // setup - write two groups of measurements - 'weekends' and 'weekdays'
- ProfileMeasurement prototype = new ProfileMeasurement()
- .withProfileName(profile)
- .withEntity(entity)
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(prototype, count, Arrays.asList("weekdays"), val -> expectedValue);
- profileWriter.write(prototype, count, Arrays.asList("weekends"), val -> 0);
-
- long end = System.currentTimeMillis();
- long start = end - TimeUnit.HOURS.toMillis(2);
- {
- //validate "weekday" results
- List<Object> groups = Arrays.asList("weekdays");
- List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, start, end, Optional.empty());
- assertEquals(count, results.size());
- results.forEach(actual -> {
- assertEquals(profile, actual.getProfileName());
- assertEquals(entity, actual.getEntity());
- assertEquals(groups, actual.getGroups());
- assertEquals(expectedValue, actual.getProfileValue());
- });
- }
- {
- //validate "weekend" results
- List<Object> groups = Arrays.asList("weekends");
- List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, start, end, Optional.empty());
- assertEquals(count, results.size());
- results.forEach(actual -> {
- assertEquals(profile, actual.getProfileName());
- assertEquals(entity, actual.getEntity());
- assertEquals(groups, actual.getGroups());
- assertEquals(0, actual.getProfileValue());
- });
- }
+ public void shouldFetchNothingWhenNothingThere() {
+ // the hbase client will indicate their are no hits
+ Result[] results = new Result[] { emptyResult };
+ when(hbaseClient.getAll()).thenReturn(results);
+
+ List<ProfileMeasurement> measurements = profilerClient.fetch(
+ Object.class,
+ expected.getProfileName(),
+ expected.getEntity(),
+ expected.getGroups(),
+ Arrays.asList(expected.getPeriod()),
+ Optional.empty());
+ assertEquals(0, measurements.size());
}
@Test
- public void Should_ReturnResultFromGroup_When_MultipleGroupsExist() throws Exception {
- final String profile = "profile1";
- final String entity = "entity1";
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final int count = hours * periodsPerHour;
- final long endTime = System.currentTimeMillis();
- final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
-
- // setup - write two groups of measurements - 'weekends' and 'weekdays'
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
- profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
-
- List<Object> weekdays = Arrays.asList("weekdays");
- List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, weekdays, startTime, endTime, Optional.empty());
-
- // should only return results from 'weekdays' group
- assertEquals(count, results.size());
- results.forEach(actual -> assertEquals(weekdays, actual.getGroups()));
+ public void shouldFetchDefaultValueWhenNothingThere() {
+ // the hbase client will indicate their are no hits
+ Result[] results = new Result[] { emptyResult };
+ when(hbaseClient.getAll()).thenReturn(results);
+
+ List<ProfileMeasurement> measurements = profilerClient.fetch(
+ Object.class,
+ expected.getProfileName(),
+ expected.getEntity(),
+ expected.getGroups(),
+ Arrays.asList(expected.getPeriod()),
+ Optional.of(profileValue));
+
+ // expect the default value to be returned
+ assertEquals(1, measurements.size());
+ assertEquals(expected, measurements.get(0));
}
@Test
- public void Should_ReturnNoResults_When_GroupDoesNotExist() {
- final String profile = "profile1";
- final String entity = "entity1";
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final int count = hours * periodsPerHour;
- final long endTime = System.currentTimeMillis();
- final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
-
- // create two groups of measurements - one on weekdays and one on weekends
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
- profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
-
- // should return no results when the group does not exist
- List<Object> groups = Arrays.asList("does-not-exist");
- List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, startTime, endTime, Optional.empty());
- assertEquals(0, results.size());
+ public void shouldFetchMultipleProfilePeriods() {
+ // need the hbase client to return a Result matching the expected profile measurement value
+ Result[] results = new Result[] { expectedResult, expectedResult, expectedResult, expectedResult };
+ when(hbaseClient.getAll()).thenReturn(results);
+
+ // fetching across multiple periods
+ ProfilePeriod start = ProfilePeriod.fromPeriodId(1L, 15L, TimeUnit.MINUTES);
+ List<ProfilePeriod> periods = new ArrayList<ProfilePeriod>() {{
+ add(start);
+ add(start.next());
+ add(start.next());
+ add(start.next());
+ }};
+
+ List<ProfileMeasurement> measurements = profilerClient.fetch(
+ Object.class,
+ expected.getProfileName(),
+ expected.getEntity(),
+ expected.getGroups(),
+ periods,
+ Optional.empty());
+
+ // the row key builder should be called once for each profile period
+ ArgumentCaptor<ProfileMeasurement> captor = new ArgumentCaptor<>();
+ verify(rowKeyBuilder, times(4)).rowKey(captor.capture());
+
+ // the profile periods should match those originally submited
+ List<ProfileMeasurement> submitted = captor.getAllValues();
+ assertEquals(periods.get(0), submitted.get(0).getPeriod());
+ assertEquals(periods.get(1), submitted.get(1).getPeriod());
+ assertEquals(periods.get(2), submitted.get(2).getPeriod());
+ assertEquals(periods.get(3), submitted.get(3).getPeriod());
+
+ assertEquals(4, measurements.size());
}
@Test
- public void Should_ReturnNoResults_When_NoDataInStartToEnd() throws Exception {
- final String profile = "profile1";
- final String entity = "entity1";
- final int hours = 2;
- int numberToWrite = hours * periodsPerHour;
- final List<Object> group = Arrays.asList("weekends");
- final long measurementTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
-
- // write some data with a timestamp of s1 day ago
- ProfileMeasurement prototype = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(measurementTime, periodDuration, periodUnits);
- profileWriter.write(prototype, numberToWrite, group, val -> 1000);
-
- // should return no results when [start,end] is long after when test data was written
- final long endFetchAt = System.currentTimeMillis();
- final long startFetchAt = endFetchAt - TimeUnit.MILLISECONDS.toMillis(30);
- List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, group, startFetchAt, endFetchAt, Optional.empty());
- assertEquals(0, results.size());
+ public void shouldCloseHBaseClient() throws IOException {
+ profilerClient.close();
+ verify(hbaseClient, times(1)).close();
}
}
\ No newline at end of file
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
deleted file mode 100644
index d4e5e66..0000000
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.ColumnList;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-/**
- * Writes ProfileMeasurement values that can be read during automated testing.
- */
-public class ProfileWriter {
-
- private RowKeyBuilder rowKeyBuilder;
- private ColumnBuilder columnBuilder;
- private LegacyHBaseClient hbaseClient;
- private HBaseProfilerClient client;
-
- public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table, long periodDurationMillis) {
- this.rowKeyBuilder = rowKeyBuilder;
- this.columnBuilder = columnBuilder;
- this.hbaseClient = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
- this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
- }
-
- /**
- * Writes profile measurements that can be used for testing.
- *
- * @param prototype A prototype for the types of ProfileMeasurements that should be written.
- * @param count The number of profile measurements to write.
- * @param group The name of the group.
- * @param valueGenerator A function that consumes the previous ProfileMeasurement value and produces the next.
- */
- public void write(ProfileMeasurement prototype, int count, List<Object> group, Function<Object, Object> valueGenerator) {
-
- ProfileMeasurement m = prototype;
- ProfilePeriod period = m.getPeriod();
- for(int i=0; i<count; i++) {
- // generate the next value that should be written
- Object nextValue = valueGenerator.apply(m.getProfileValue());
-
- // write the measurement
- m = new ProfileMeasurement()
- .withProfileName(prototype.getProfileName())
- .withEntity(prototype.getEntity())
- .withPeriod(period)
- .withGroups(group)
- .withProfileValue(nextValue);
- write(m);
-
- // advance to the next period
- period = m.getPeriod().next();
- }
- }
-
- /**
- * Write a ProfileMeasurement.
- * @param m The ProfileMeasurement to write.
- */
- private void write(ProfileMeasurement m) {
-
- byte[] rowKey = rowKeyBuilder.rowKey(m);
- ColumnList cols = columnBuilder.columns(m);
-
- hbaseClient.addMutation(rowKey, cols, Durability.SKIP_WAL);
- hbaseClient.mutate();
- }
-
- public static void main(String[] args) throws Exception {
- RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
- ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder();
-
- Configuration config = HBaseConfiguration.create();
- config.set("hbase.master.hostname", "node1");
- config.set("hbase.regionserver.hostname", "node1");
- config.set("hbase.zookeeper.quorum", "node1");
-
- HTableProvider provider = new HTableProvider();
- HTableInterface table = provider.getTable(config, "profiler");
-
- long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
- long when = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
- ProfileMeasurement measure = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("192.168.66.121")
- .withPeriod(when, periodDurationMillis, TimeUnit.MILLISECONDS);
-
- ProfileWriter writer = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
- writer.write(measure, 2 * 24 * 4, Collections.emptyList(), val -> new Random().nextInt(10));
- }
-}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
index 3fbed1c..41afa04 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
@@ -20,437 +20,191 @@
package org.apache.metron.profiler.client.stellar;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.ProfileWriter;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClient;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
- * Tests the GetProfile class.
+ * Tests the 'PROFILE_GET' function in the {@link GetProfile} class.
*/
public class GetProfileTest {
- private static final long periodDuration = 15;
- private static final TimeUnit periodUnits = TimeUnit.MINUTES;
- private static final int saltDivisor = 1000;
- private static final String tableName = "profiler";
- private static final String columnFamily = "P";
private StellarStatefulExecutor executor;
- private Map<String, Object> state;
- private ProfileWriter profileWriter;
- // different values of period and salt divisor, used to test config_overrides feature
- private static final long periodDuration2 = 1;
- private static final TimeUnit periodUnits2 = TimeUnit.HOURS;
- private static final int saltDivisor2 = 2050;
-
- private <T> T run(String expression, Class<T> clazz) {
- return executor.execute(expression, state, clazz);
+ private FunctionResolver functionResolver;
+ private Map<String, Object> globals;
+ private GetProfile function;
+ private ProfilerClient profilerClient;
+ private List<Integer> results;
+ private ProfileMeasurement expected;
+ private ProfileMeasurement defaultMeasurement;
+ private Object defaultValue;
+
+ private List run(String expression) {
+ return executor.execute(expression, new HashMap<>(), List.class);
}
- /**
- * This method sets up the configuration context for both writing profile data
- * (using profileWriter to mock the complex process of what the Profiler topology
- * actually does), and then reading that profile data (thereby testing the PROFILE_GET
- * Stellar client implemented in GetProfile).
- *
- * It runs at @Before time, and sets testclass global variables used by the writers and readers.
- * The various writers and readers are in each test case, not here.
- *
- * @return void
- */
@Before
public void setup() {
- state = new HashMap<>();
- final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
- // used to write values to be read during testing
- long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
- RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
- ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
- profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
-
- // global properties
- Map<String, Object> global = new HashMap<String, Object>() {{
- put(PROFILER_HBASE_TABLE.getKey(), tableName);
- put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
- put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
- put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
- put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
- }};
-
- // create the stellar execution environment
- executor = new DefaultStellarStatefulExecutor(
- new SimpleFunctionResolver()
- .withClass(GetProfile.class)
- .withClass(FixedLookback.class),
- new Context.Builder()
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
- .build());
- }
-
- /**
- * This method is similar to setup(), in that it sets up profiler configuration context,
- * but only for the client. Additionally, it uses periodDuration2, periodUnits2
- * and saltDivisor2, instead of periodDuration, periodUnits and saltDivisor respectively.
- *
- * This is used in the unit tests that test the config_overrides feature of PROFILE_GET.
- * In these tests, the context from @Before setup() is used to write the data, then the global
- * context is changed to context2 (from this method). Each test validates that a default read
- * using global context2 then gets no valid results (as expected), and that a read using
- * original context values in the PROFILE_GET config_overrides argument gets all expected results.
- *
- * @return context2 - The profiler client configuration context created by this method.
- * The context2 values are also set in the configuration of the StellarStatefulExecutor
- * stored in the global variable 'executor'. However, there is no API for querying the
- * context values from a StellarStatefulExecutor, so we output the context2 Context object itself,
- * for validation purposes (so that its values can be validated as being significantly
- * different from the setup() settings).
- */
- private Context setup2() {
- state = new HashMap<>();
+ // the mock profiler client used to feed profile measurement values to the function
+ profilerClient = mock(ProfilerClient.class);
// global properties
- Map<String, Object> global = new HashMap<String, Object>() {{
- put(PROFILER_HBASE_TABLE.getKey(), tableName);
- put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
- put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
- put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
- put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
- }};
-
- // create the modified context
- Context context2 = new Context.Builder()
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+ globals = new HashMap<>();
+ Context context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
.build();
- // create the stellar execution environment
- executor = new DefaultStellarStatefulExecutor(
- new SimpleFunctionResolver()
- .withClass(GetProfile.class)
- .withClass(FixedLookback.class),
- context2);
-
- return context2; //because there is no executor.getContext() method
- }
+ // the PROFILE_GET function that will be tested
+ function = new GetProfile(globals -> profilerClient);
+ function.initialize(context);
- /**
- * Values should be retrievable that have NOT been stored within a group.
- */
- @Test
- public void testWithNoGroups() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Collections.emptyList();
+ // create the stellar execution environment
+ functionResolver = new SimpleFunctionResolver()
+ .withClass(FixedLookback.class)
+ .withInstance(function);
+ executor = new DefaultStellarStatefulExecutor(functionResolver, context);
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
+ // create a profile measurement used in the tests
+ expected = new ProfileMeasurement()
.withProfileName("profile1")
.withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // execute - read the profile values - no groups
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
- result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
- }
-
- /**
- * Values should be retrievable that have been stored within a 'group'.
- */
- @Test
- public void testWithOneGroup() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Arrays.asList("weekends");
+ .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+ .withProfileValue(1231121);
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
+ defaultValue = 7777777;
+ defaultMeasurement = new ProfileMeasurement()
.withProfileName("profile1")
.withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // create a variable that contains the groups to use
- state.put("groups", group);
-
- // execute - read the profile values
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
-
- // test the deprecated but allowed "varargs" form of groups specification
- expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekends')";
- result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
- result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
+ .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+ .withProfileValue(defaultValue);
}
- /**
- * Values should be retrievable that have been stored within a 'group'.
- */
@Test
- public void testWithTwoGroups() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Arrays.asList("weekdays", "tuesday");
-
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // create a variable that contains the groups to use
- state.put("groups", group);
-
- // execute - read the profile values
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekdays', 'tuesday'])";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
-
- // test the deprecated but allowed "varargs" form of groups specification
- expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekdays', 'tuesday')";
- result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
- result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
+ public void testGetProfileValue() {
+ // only one profile measurement exists
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ any())).thenReturn(Arrays.asList(expected));
+
+ // expect the one measurement to be returned
+ results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+ assertEquals(1, results.size());
+ assertEquals(expected.getProfileValue(), results.get(0));
}
- /**
- * Initialization should fail if the required context values are missing.
- */
- @Test(expected = ParseException.class)
- public void testMissingContext() {
- Context empty = Context.EMPTY_CONTEXT();
-
- // 'unset' the context that was created during setup()
- executor.setContext(empty);
-
- // force re-initialization with no context
- SingletonFunctionResolver.getInstance().initialize(empty);
-
- // validate - function should be unable to initialize
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 'SECONDS'), groups)";
- run(expr, List.class);
- }
-
- /**
- * If the time horizon specified does not include any profile measurements, then
- * none should be returned.
- */
@Test
- public void testOutsideTimeHorizon() {
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Collections.emptyList();
-
- // setup - write a single value from 2 hours ago
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, 1, group, val -> expectedValue);
-
- // create a variable that contains the groups to use
- state.put("groups", group);
-
- // execute - read the profile values
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - there should be no values from only 4 seconds ago
- Assert.assertEquals(0, result.size());
+ public void testGetProfileValueWithGroup() {
+ // the profile measurement is part of a group
+ expected.withGroups(Arrays.asList("group1"));
+
+ // only one profile measurement exists
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ any())).thenReturn(Arrays.asList(expected));
+
+ // expect the one measurement to be returned
+ results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group1'])");
+ assertEquals(1, results.size());
+ assertEquals(expected.getProfileValue(), results.get(0));
}
- /**
- * Default value should be able to be specified
- */
@Test
- public void testWithDefaultValue() {
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - expect to fail to read any values because we didn't write any.
- Assert.assertEquals(0, result.size());
-
- // execute - read the profile values - with config_override.
- // first two override values are strings, third is deliberately a number.
- testOverride("{'profiler.default.value' : 0}", 0);
- testOverride("{'profiler.default.value' : 'metron'}", "metron");
- testOverride("{'profiler.default.value' : []}", new ArrayList<>());
+ public void testGetProfileValueWithDifferentGroup() {
+ // the profile measurement is part of a group
+ expected.withGroups(Arrays.asList("group1"));
+
+ // only one profile measurement exists
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ any())).thenReturn(Arrays.asList(expected));
+
+ // expect the one measurement to be returned
+ results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group999'])");
+ assertEquals(0, results.size());
}
- private void testOverride(String overrides, Object defaultVal) {
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), [], " + overrides + ")";
- List<Object> result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours (16 or 17 values depending on start time)
- // but they should all be the default value.
- Assert.assertTrue(result.size() == 16 || result.size() == 17);
- result.forEach(actual -> Assert.assertEquals(defaultVal, actual));
+ @Test
+ public void shouldReturnNothingWhenNoMeasurementsExist() {
+ // no measurements exist
+ when(profilerClient.fetch(
+ eq(Object.class),
+ any(),
+ any(),
+ any(),
+ any(),
+ any())).thenReturn(Collections.emptyList());
+
+ // no measurements exist
+ results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+ assertEquals(0, results.size());
}
- /**
- * Values should be retrievable that were written with configuration different than current global config.
- */
@Test
- public void testWithConfigOverride() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Collections.emptyList();
-
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // now change the executor configuration
- Context context2 = setup2();
- // validate it is changed in significant way
- @SuppressWarnings("unchecked")
- Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
- Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
- Assert.assertNotEquals(periodDuration, periodDuration2);
-
- // execute - read the profile values - with (wrong) default global config values.
- // No error message at this time, but returns empty results list, because
- // row keys are not correctly calculated.
- String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - expect to fail to read any values
- Assert.assertEquals(0, result.size());
-
- // execute - read the profile values - with config_override.
- // first two override values are strings, third is deliberately a number.
- String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
- + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
- + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
- expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + overrides + "), [], " + overrides + ")"
- ;
- result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
- result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
+ public void shouldUseDefaultValueFromGlobals() {
+ // set a default value
+ globals.put("profiler.default.value", defaultValue);
+
+ // the underlying profile client is responsible for returning the default value, if no profiles are found
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ eq(Optional.of(defaultValue)))).thenReturn(Arrays.asList(defaultMeasurement));
+
+ results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+ assertEquals(1, results.size());
+ assertEquals(defaultValue, results.get(0));
}
- /**
- * Values should be retrievable that have been stored within a 'group', with
- * configuration different than current global config.
- * This time put the config_override case before the non-override case.
- */
@Test
- public void testWithConfigAndOneGroup() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Arrays.asList("weekends");
-
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // create a variable that contains the groups to use
- state.put("groups", group);
-
- // now change the executor configuration
- Context context2 = setup2();
- // validate it is changed in significant way
- @SuppressWarnings("unchecked")
- Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
- Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), Long.toString(periodDuration2));
- Assert.assertNotEquals(periodDuration, periodDuration2);
-
- // execute - read the profile values - with config_override.
- // first two override values are strings, third is deliberately a number.
- String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
- + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
- + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
- String expr = "PROFILE_GET('profile1', 'entity1'" +
- ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
- overrides + ")";
- @SuppressWarnings("unchecked")
- List<Integer> result = run(expr, List.class);
-
- // validate - expect to read all values from the past 4 hours
- Assert.assertEquals(count, result.size());
-
- // execute - read the profile values - with (wrong) default global config values.
- // No error message at this time, but returns empty results list, because
- // row keys are not correctly calculated.
- expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
- result = run(expr, List.class);
-
- // validate - expect to fail to read any values
- Assert.assertEquals(0, result.size());
+ public void shouldUseDefaultValueFromOverrides() {
+ // the underlying profile client is responsible for returning the default value, if no profiles are found
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ eq(Optional.of(defaultValue)))).thenReturn(Arrays.asList(defaultMeasurement));
+
+ // set the default value in the overrides map
+ String overrides = String.format( "{ 'profiler.default.value': %s }", defaultValue);
+ results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), [], " + overrides + ")");
+ assertEquals(1, results.size());
+ assertEquals(defaultValue, results.get(0));
}
}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
index bd39007..6869046 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
@@ -20,19 +20,13 @@
package org.apache.metron.profiler.client.stellar;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.ProfileWriter;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClient;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -41,180 +35,176 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.ENTITY_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.GROUPS_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_END_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_START_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PROFILE_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.VALUE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
- * Tests the VerboseProfile class.
+ * Tests the 'PROFILE_VERBOSE' function in the {@link VerboseProfile} class.
*/
public class VerboseProfileTest {
- private static final long periodDuration = 15;
- private static final TimeUnit periodUnits = TimeUnit.MINUTES;
- private static final int saltDivisor = 1000;
- private static final String tableName = "profiler";
- private static final String columnFamily = "P";
+
private StellarStatefulExecutor executor;
- private Map<String, Object> state;
- private ProfileWriter profileWriter;
+ private FunctionResolver functionResolver;
+ private Map<String, Object> globals;
+ private VerboseProfile function;
+ private ProfilerClient profilerClient;
+ private List<Map<String, Object>> results;
+ private ProfileMeasurement expected;
- private <T> T run(String expression, Class<T> clazz) {
- return executor.execute(expression, state, clazz);
+ private List run(String expression) {
+ return executor.execute(expression, new HashMap<>(), List.class);
}
- private Map<String, Object> globals;
-
@Before
public void setup() {
- state = new HashMap<>();
- final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
- // used to write values to be read during testing
- long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
- RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
- ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
- profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
+ // the mock profiler client used to feed profile measurement values to the function
+ profilerClient = mock(ProfilerClient.class);
// global properties
- globals = new HashMap<String, Object>() {{
- put(PROFILER_HBASE_TABLE.getKey(), tableName);
- put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
- put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
- put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
- put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
- }};
+ globals = new HashMap<>();
+ Context context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
+ .build();
+
+ // the VERBOSE_PROFILE function that will be tested
+ function = new VerboseProfile(globals -> profilerClient);
+ function.initialize(context);
// create the stellar execution environment
- executor = new DefaultStellarStatefulExecutor(
- new SimpleFunctionResolver()
- .withClass(VerboseProfile.class)
- .withClass(FixedLookback.class),
- new Context.Builder()
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
- .build());
- }
+ functionResolver = new SimpleFunctionResolver()
+ .withClass(FixedLookback.class)
+ .withInstance(function);
+ executor = new DefaultStellarStatefulExecutor(functionResolver, context);
- @Test
- public void shouldReturnMeasurementsWhenNotGrouped() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Collections.emptyList();
-
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
+ // create a profile measurement used in the tests
+ expected = new ProfileMeasurement()
.withProfileName("profile1")
.withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // expect to see all values over the past 4 hours
- List<Map<String, Object>> results;
- results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))", List.class);
- Assert.assertEquals(count, results.size());
- for(Map<String, Object> actual: results) {
- Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
- Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
- Assert.assertNotNull(actual.get(PERIOD_KEY));
- Assert.assertNotNull(actual.get(PERIOD_START_KEY));
- Assert.assertNotNull(actual.get(PERIOD_END_KEY));
- Assert.assertNotNull(actual.get(GROUPS_KEY));
- Assert.assertEquals(expectedValue, actual.get(VALUE_KEY));
- }
+ .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+ .withProfileValue(1231121);
}
@Test
- public void shouldReturnMeasurementsWhenGrouped() {
- final int periodsPerHour = 4;
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Arrays.asList("weekends");
-
- // setup - write some measurements to be read later
- final int count = hours * periodsPerHour;
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, count, group, val -> expectedValue);
-
- // create a variable that contains the groups to use
- state.put("groups", group);
-
- // expect to see all values over the past 4 hours for the group
- List<Map<String, Object>> results;
- results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), groups)", List.class);
- Assert.assertEquals(count, results.size());
- for(Map<String, Object> actual: results) {
- Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
- Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
- Assert.assertNotNull(actual.get(PERIOD_KEY));
- Assert.assertNotNull(actual.get(PERIOD_START_KEY));
- Assert.assertNotNull(actual.get(PERIOD_END_KEY));
- Assert.assertNotNull(actual.get(GROUPS_KEY));
- Assert.assertEquals(expectedValue, actual.get(VALUE_KEY));
- }
+ public void shouldRenderVerboseView() {
+ // only one profile measurement exists
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ any())).thenReturn(Arrays.asList(expected));
+
+ // expect the one measurement to be returned
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+ assertEquals(1, results.size());
+ Map<String, Object> actual = results.get(0);
+
+ // the measurement should be rendered as a map containing detailed information about the profile measurement
+ assertEquals(expected.getProfileName(), actual.get("profile"));
+ assertEquals(expected.getEntity(), actual.get("entity"));
+ assertEquals(expected.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(expected.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(expected.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertEquals(expected.getProfileValue(), actual.get("value"));
+ assertEquals(expected.getGroups(), actual.get("groups"));
}
@Test
- public void shouldReturnNothingWhenNoMeasurementsExist() {
- final int expectedValue = 2302;
- final int hours = 2;
- final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
- final List<Object> group = Collections.emptyList();
+ public void shouldRenderVerboseViewWithGroup() {
+ // the profile measurement is part of a group
+ expected.withGroups(Arrays.asList("group1"));
+
+ // only one profile measurement exists
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ any())).thenReturn(Arrays.asList(expected));
+
+ // expect the one measurement to be returned
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group1'])");
+ assertEquals(1, results.size());
+ Map<String, Object> actual = results.get(0);
+
+ // the measurement should be rendered as a map containing detailed information about the profile measurement
+ assertEquals(expected.getProfileName(), actual.get("profile"));
+ assertEquals(expected.getEntity(), actual.get("entity"));
+ assertEquals(expected.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(expected.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(expected.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertEquals(expected.getProfileValue(), actual.get("value"));
+ assertEquals(expected.getGroups(), actual.get("groups"));
+ }
- // setup - write a single value from 2 hours ago
- ProfileMeasurement m = new ProfileMeasurement()
- .withProfileName("profile1")
- .withEntity("entity1")
- .withPeriod(startTime, periodDuration, periodUnits);
- profileWriter.write(m, 1, group, val -> expectedValue);
+ @Test
+ public void shouldRenderVerboseViewWithDifferentGroup() {
+ // the profile measurement is part of a group
+ expected.withGroups(Arrays.asList("group1"));
+
+ // only one profile measurement exists
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ any())).thenReturn(Arrays.asList(expected));
+
+ // the profile measurement is not part of 'group999'
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group999'])");
+ assertEquals(0, results.size());
+ }
- // expect to get NO measurements over the past 4 seconds
- List<Map<String, Object>> result;
- result = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))", List.class);
- Assert.assertEquals(0, result.size());
+ @Test
+ public void shouldReturnNothingWhenNoMeasurementsExist() {
+ // no measurements exist
+ when(profilerClient.fetch(
+ eq(Object.class),
+ any(),
+ any(),
+ any(),
+ any(),
+ any())).thenReturn(Collections.emptyList());
+
+ // no measurements exist
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+ assertEquals(0, results.size());
}
@Test
- public void shouldReturnDefaultValueWhenNoMeasurementsExist() {
+ public void shouldReturnDefaultValue() {
// set a default value
- String defaultVal = "this is the default value";
- globals.put("profiler.default.value", defaultVal);
-
- // no profiles exist
- String expr = "PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
- List<Map<String, Object>> results = run(expr, List.class);
-
- // expect to get the default value instead of no results
- Assert.assertTrue(results.size() == 16 || results.size() == 17);
- for(Map<String, Object> actual: results) {
- Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
- Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
- Assert.assertNotNull(actual.get(PERIOD_KEY));
- Assert.assertNotNull(actual.get(PERIOD_START_KEY));
- Assert.assertNotNull(actual.get(PERIOD_END_KEY));
- Assert.assertNotNull(actual.get(GROUPS_KEY));
-
- // expect the default value
- Assert.assertEquals(defaultVal, actual.get(VALUE_KEY));
- }
-
+ globals.put("profiler.default.value", expected);
+
+ // the underlying profile client needs to be setup to return the default
+ when(profilerClient.fetch(
+ eq(Object.class),
+ eq(expected.getProfileName()),
+ eq(expected.getEntity()),
+ eq(expected.getGroups()),
+ any(),
+ eq(Optional.of(expected)))).thenReturn(Arrays.asList(expected));
+
+ results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+ assertEquals(1, results.size());
+ Map<String, Object> actual = results.get(0);
+
+ // no measurements exist, but we expect the default value to be returned
+ assertEquals(expected.getProfileName(), actual.get("profile"));
+ assertEquals(expected.getEntity(), actual.get("entity"));
+ assertEquals(expected.getPeriod().getPeriod(), actual.get("period"));
+ assertEquals(expected.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+ assertEquals(expected.getPeriod().getEndTimeMillis(), actual.get("period.end"));
+ assertEquals(expected.getProfileValue(), actual.get("value"));
+ assertEquals(expected.getGroups(), actual.get("groups"));
}
}
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
index 3f889bc..5a9610e 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
@@ -241,10 +241,18 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
}
}
+ public long getPeriodDurationMillis() {
+ return periodDurationMillis;
+ }
+
public void withPeriodDuration(long duration, TimeUnit units) {
periodDurationMillis = units.toMillis(duration);
}
+ public int getSaltDivisor() {
+ return saltDivisor;
+ }
+
public void setSaltDivisor(int saltDivisor) {
this.saltDivisor = saltDivisor;
}
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index 0a4a99d..b580db8 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -34,7 +34,6 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
* The column family storing the profile data.
*/
private String columnFamily;
-
private byte[] columnFamilyBytes;
public ValueOnlyColumnBuilder() {
@@ -47,7 +46,6 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
@Override
public ColumnList columns(ProfileMeasurement measurement) {
-
ColumnList cols = new ColumnList();
cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getProfileValue()));
@@ -66,7 +64,6 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
@Override
public byte[] getColumnQualifier(String fieldName) {
-
if("value".equals(fieldName)) {
return Bytes.toBytes("value");
}
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index e04404c..31b41ff 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -21,7 +21,6 @@
package org.apache.metron.profiler;
import com.github.benmanes.caffeine.cache.Ticker;
-import com.google.common.util.concurrent.MoreExecutors;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
import org.apache.metron.common.utils.JSONUtils;
@@ -33,7 +32,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.HOURS;
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index b046cc4..0cd811d 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -145,6 +145,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${global_mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-integration-test</artifactId>
<version>${project.parent.version}</version>
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 571545e..d3cfdca 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -28,6 +28,7 @@ import org.apache.metron.profiler.spark.function.MessageRouterFunction;
import org.apache.metron.profiler.spark.function.ProfileBuilderFunction;
import org.apache.metron.profiler.spark.reader.TelemetryReader;
import org.apache.metron.profiler.spark.reader.TelemetryReaders;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
@@ -97,8 +98,11 @@ public class BatchProfiler implements Serializable {
LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
// write the profile measurements to HBase
+ MapPartitionsFunction<ProfileMeasurementAdapter, Integer> mapper = new HBaseWriterFunction.Builder()
+ .withProperties(profilerProps)
+ .build();
long count = measurements
- .mapPartitions(new HBaseWriterFunction(profilerProps), Encoders.INT())
+ .mapPartitions(mapper, Encoders.INT())
.agg(sum("value"))
.head()
.getLong(0);
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
index 148d970..d120a29 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
@@ -20,13 +20,14 @@
package org.apache.metron.profiler.spark;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import java.util.Map;
import java.util.Properties;
import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
-import static org.apache.metron.profiler.spark.reader.TelemetryReaders.TEXT;
/**
* Defines the configuration values recognized by the Batch Profiler.
@@ -39,7 +40,9 @@ public enum BatchProfilerConfig {
HBASE_SALT_DIVISOR("profiler.hbase.salt.divisor", 1000, Integer.class),
- HBASE_TABLE_PROVIDER("profiler.hbase.table.provider", "org.apache.metron.hbase.HTableProvider", String.class),
+ HBASE_CONNECTION_FACTORY("profiler.hbase.connection.provider", HBaseConnectionFactory.class.getName(), String.class),
+
+ HBASE_CLIENT_FACTORY("profiler.hbase.client.factory", HBaseTableClientFactory.class, String.class),
HBASE_TABLE_NAME("profiler.hbase.table", "profiler", String.class),
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
index 6a090cf..bb66af8 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -21,12 +21,13 @@ package org.apache.metron.profiler.spark.function;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
import org.apache.metron.profiler.ProfileMeasurement;
import org.apache.metron.profiler.hbase.ColumnBuilder;
import org.apache.metron.profiler.hbase.RowKeyBuilder;
@@ -39,16 +40,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CLIENT_FACTORY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CONNECTION_FACTORY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_SALT_DIVISOR;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_WRITE_DURABILITY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
@@ -60,7 +61,93 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private TableProvider tableProvider;
+ public static class Builder {
+ private HBaseConnectionFactory connectionFactory = new HBaseConnectionFactory();
+ private HBaseClientFactory hBaseClientFactory = new HBaseTableClientFactory();
+ private String tableName = String.class.cast(HBASE_TABLE_NAME.getDefault());
+ private Durability durability = Durability.class.cast(HBASE_WRITE_DURABILITY.getDefault());
+ private RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+ private ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder();
+
+ public Builder withConnectionFactory(HBaseConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ return this;
+ }
+
+ public Builder withClientFactory(HBaseClientFactory clientFactory) {
+ this.hBaseClientFactory = clientFactory;
+ return this;
+ }
+
+ public Builder withRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
+ this.rowKeyBuilder = rowKeyBuilder;
+ return this;
+ }
+
+ public Builder withColumnBuilder(ColumnBuilder columnBuilder) {
+ this.columnBuilder = columnBuilder;
+ return this;
+ }
+
+ public Builder withProperties(Properties properties) {
+ // row key builder
+ int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class);
+ int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+ TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+ rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits);
+
+ // column builder
+ String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class);
+ columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+ // hbase
+ tableName = HBASE_TABLE_NAME.get(properties, String.class);
+ durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class);
+
+ // connection factory
+ String factoryImpl = HBASE_CONNECTION_FACTORY.get(properties, String.class);
+ connectionFactory = createConnectionFactory(factoryImpl);
+
+ // client creator
+ String creatorImpl = HBASE_CLIENT_FACTORY.get(properties, String.class);
+ hBaseClientFactory = HBaseClientFactory.byName(creatorImpl, () -> new HBaseTableClientFactory());
+
+ return this;
+ }
+
+ private static HBaseConnectionFactory createConnectionFactory(String factoryImpl) {
+ LOG.trace("Creating table provider; className={}", factoryImpl);
+
+ // if class name not defined, use a reasonable default
+ if(StringUtils.isEmpty(factoryImpl) || factoryImpl.charAt(0) == '$') {
+ return new HBaseConnectionFactory();
+ }
+
+ // instantiate the table provider
+ return HBaseConnectionFactory.byName(factoryImpl);
+ }
+
+ public HBaseWriterFunction build() {
+ HBaseWriterFunction function = new HBaseWriterFunction();
+ function.connectionFactory = connectionFactory;
+ function.hBaseClientFactory = hBaseClientFactory;
+ function.tableName = tableName;
+ function.durability = durability;
+ function.rowKeyBuilder = rowKeyBuilder;
+ function.columnBuilder = columnBuilder;
+ return function;
+ }
+ }
+
+ /**
+ * Establishes connections to HBase.
+ */
+ private HBaseConnectionFactory connectionFactory;
+
+ /**
+ * Creates the {@link HBaseTableClient} when it is needed.
+ */
+ private HBaseClientFactory hBaseClientFactory;
/**
* The name of the HBase table to write to.
@@ -82,23 +169,11 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
*/
private ColumnBuilder columnBuilder;
- public HBaseWriterFunction(Properties properties) {
- tableName = HBASE_TABLE_NAME.get(properties, String.class);
- durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class);
-
- // row key builder
- int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class);
- int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
- TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
- rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits);
-
- // column builder
- String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class);
- columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-
- // hbase table provider
- String providerImpl = HBASE_TABLE_PROVIDER.get(properties, String.class);
- tableProvider = createTableProvider(providerImpl);
+ /**
+ * Use the {@link HBaseWriterFunction.Builder} instead.
+ */
+ private HBaseWriterFunction() {
+ // nothing to do
}
/**
@@ -117,8 +192,7 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
if(measurements.size() > 0) {
// open an HBase connection
- Configuration config = HBaseConfiguration.create();
- try (LegacyHBaseClient client = new LegacyHBaseClient(tableProvider, config, tableName)) {
+ try (HBaseClient client = hBaseClientFactory.create(connectionFactory, HBaseConfiguration.create(), tableName)) {
for (ProfileMeasurementAdapter adapter : measurements) {
ProfileMeasurement m = adapter.toProfileMeasurement();
@@ -135,37 +209,4 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
LOG.debug("{} profile measurement(s) written to HBase", count);
return IteratorUtils.singletonIterator(count);
}
-
- /**
- * Set the {@link TableProvider} using the class name of the provider.
- * @param providerImpl The name of the class.
- * @return
- */
- public HBaseWriterFunction withTableProviderImpl(String providerImpl) {
- this.tableProvider = createTableProvider(providerImpl);
- return this;
- }
-
- /**
- * Creates a TableProvider based on a class name.
- * @param providerImpl The class name of a TableProvider
- */
- private static TableProvider createTableProvider(String providerImpl) {
- LOG.trace("Creating table provider; className={}", providerImpl);
-
- // if class name not defined, use a reasonable default
- if(StringUtils.isEmpty(providerImpl) || providerImpl.charAt(0) == '$') {
- return new HTableProvider();
- }
-
- // instantiate the table provider
- try {
- Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(providerImpl);
- return clazz.getConstructor().newInstance();
-
- } catch (InstantiationException | IllegalAccessException | IllegalStateException |
- InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
- throw new IllegalStateException("Unable to instantiate connector", e);
- }
- }
}
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index b36cf8c..673a0b6 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -20,10 +20,16 @@
package org.apache.metron.profiler.spark;
import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
+import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.ProfilerClient;
import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
@@ -46,14 +52,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import static org.apache.metron.common.configuration.profiler.ProfilerConfig.fromJSON;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_CONNECTION_FACTORY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CLIENT_FACTORY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CONNECTION_FACTORY;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END;
import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
@@ -101,28 +109,40 @@ public class BatchProfilerIntegrationTest {
public void setup() {
readerProperties = new Properties();
profilerProperties = new Properties();
-
- // the output will be written to a mock HBase table
String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
- profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-
- // create the mock hbase table
- MockHBaseTableProvider.addToCache(tableName, columnFamily);
+ profilerProperties.put(HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
// define the globals required by `PROFILE_GET`
Map<String, Object> global = new HashMap<String, Object>() {{
put(PROFILER_HBASE_TABLE.getKey(), tableName);
put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+ put(PROFILER_HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
}};
+ // the batch profiler needs to use the `FakeHBaseClient` for these tests
+ profilerProperties.put(HBASE_CLIENT_FACTORY.getKey(), FakeHBaseClientFactory.class.getName());
+
+ // ensure that all of the static records are deleted before running the test
+ FakeHBaseClient hbaseClient = new FakeHBaseClient();
+ hbaseClient.deleteAll();
+
+ // create a `ProfilerClient` that uses the `FakeHBaseClient`
+ ProfilerClient profilerClient = new HBaseProfilerClient(
+ hbaseClient,
+ new SaltyRowKeyBuilder(),
+ new ValueOnlyColumnBuilder(),
+ TimeUnit.MINUTES.toMillis(15));
+
+ // create an instance of `PROFILE_GET` that indirectly uses the `FakeHBaseClient`
+ GetProfile profileGetFunction = new GetProfile(globals -> profilerClient);
+
// create the stellar execution environment
executor = new DefaultStellarStatefulExecutor(
new SimpleFunctionResolver()
- .withClass(GetProfile.class)
.withClass(FixedLookback.class)
- .withClass(WindowLookback.class),
+ .withClass(WindowLookback.class)
+ .withInstance(profileGetFunction),
new Context.Builder()
.with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
.build());
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
index 55f3e21..c0f0370 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
@@ -21,8 +21,14 @@ package org.apache.metron.profiler.spark.function;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
import org.json.simple.JSONObject;
import org.junit.Assert;
@@ -32,64 +38,71 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+import static org.apache.metron.hbase.client.FakeHBaseClient.Mutation;
public class HBaseWriterFunctionTest {
- Properties profilerProperties;
+ private HBaseWriterFunction function;
+ private RowKeyBuilder rowKeyBuilder;
+ private ColumnBuilder columnBuilder;
+ private FakeHBaseClient hbaseClient;
+ private HBaseClientFactory hBaseClientFactory;
+
+ private static final JSONObject message = getMessage();
+ private static final String entity = (String) message.get("ip_src_addr");
+ private static final long timestamp = (Long) message.get("timestamp");
+ private static final ProfileConfig profile = getProfile();
@Before
public void setup() {
- profilerProperties = getProfilerProperties();
-
- // create a mock table for HBase
- String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
- String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
- MockHBaseTableProvider.addToCache(tableName, columnFamily);
+ hbaseClient = new FakeHBaseClient();
+ hbaseClient.deleteAll();
+ hBaseClientFactory = (x, y, z) -> hbaseClient;
+ rowKeyBuilder = new SaltyRowKeyBuilder();
+ columnBuilder = new ValueOnlyColumnBuilder();
+ function = new HBaseWriterFunction.Builder()
+ .withRowKeyBuilder(rowKeyBuilder)
+ .withColumnBuilder(columnBuilder)
+ .withClientFactory(hBaseClientFactory)
+ .build();
}
@Test
public void testWrite() throws Exception {
-
- JSONObject message = getMessage();
- String entity = (String) message.get("ip_src_addr");
- long timestamp = (Long) message.get("timestamp");
- ProfileConfig profile = getProfile();
-
- // setup the profile measurements that will be written
+ // write a profile measurement
List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile);
-
- // setup the function to test
- HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
- function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
-
- // write the measurements
Iterator<Integer> results = function.call(measurements.iterator());
- // validate the result
+ // validate the results
List<Integer> counts = IteratorUtils.toList(results);
Assert.assertEquals(1, counts.size());
Assert.assertEquals(1, counts.get(0).intValue());
+
+ // 1 record should have been written to the hbase client
+ List<Mutation> written = hbaseClient.getAllPersisted();
+ Assert.assertEquals(1, written.size());
+
+ // validate the row key used to write to hbase
+ ProfileMeasurement m = measurements.get(0).toProfileMeasurement();
+ byte[] expectedRowKey = rowKeyBuilder.rowKey(m);
+ Assert.assertArrayEquals(expectedRowKey, written.get(0).rowKey);
+
+ // validate the columns used to write to hbase.
+ ColumnList expectedCols = columnBuilder.columns(m);
+ Assert.assertEquals(expectedCols, written.get(0).columnList);
}
@Test
public void testWriteMany() throws Exception {
-
- JSONObject message = getMessage();
- String entity = (String) message.get("ip_src_addr");
- long timestamp = (Long) message.get("timestamp");
- ProfileConfig profile = getProfile();
-
// setup the profile measurements that will be written
List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile);
// setup the function to test
- HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
- function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+ HBaseWriterFunction function = new HBaseWriterFunction.Builder()
+ .withClientFactory(hBaseClientFactory)
+ .build();
// write the measurements
Iterator<Integer> results = function.call(measurements.iterator());
@@ -102,13 +115,13 @@ public class HBaseWriterFunctionTest {
@Test
public void testWriteNone() throws Exception {
-
// there are no profile measurements to write
List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
// setup the function to test
- HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
- function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+ HBaseWriterFunction function = new HBaseWriterFunction.Builder()
+ .withClientFactory(hBaseClientFactory)
+ .build();
// write the measurements
Iterator<Integer> results = function.call(measurements.iterator());
@@ -147,7 +160,7 @@ public class HBaseWriterFunctionTest {
/**
* Returns a telemetry message to use for testing.
*/
- private JSONObject getMessage() {
+ private static JSONObject getMessage() {
JSONObject message = new JSONObject();
message.put("ip_src_addr", "192.168.1.1");
message.put("status", "red");
@@ -156,16 +169,9 @@ public class HBaseWriterFunctionTest {
}
/**
- * Returns profiler properties to use for testing.
- */
- private Properties getProfilerProperties() {
- return new Properties();
- }
-
- /**
* Returns a profile definition to use for testing.
*/
- private ProfileConfig getProfile() {
+ private static ProfileConfig getProfile() {
return new ProfileConfig()
.withProfile("profile1")
.withForeach("ip_src_addr")
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index 09bd238..d71dfc4 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -29,6 +29,16 @@
</properties>
<dependencies>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava_version}</version>
diff --git a/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties b/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
index dc30838..91a5d3b 100644
--- a/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
@@ -61,6 +61,8 @@ profiler.hbase.table=profiler
profiler.hbase.column.family=P
profiler.hbase.batch=10
profiler.hbase.flush.interval.seconds=30
+profiler.hbase.client.factory=org.apache.metron.hbase.client.HBaseTableClientFactory
+profiler.hbase.connection.factory=org.apache.metron.hbase.client.HBaseConnectionFactory
##### Kafka #####
diff --git a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
index e16a782..b3faf34 100644
--- a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
@@ -137,6 +137,9 @@ components:
- ${profiler.window.lag}
- "${profiler.window.lag.units}"
+ - id: "hbaseClientFactory"
+ className: "${profiler.hbase.client.factory}"
+
spouts:
- id: "kafkaSpout"
@@ -179,8 +182,10 @@ bolts:
- "${profiler.hbase.table}"
- ref: "hbaseMapper"
configMethods:
- - name: "withTableProvider"
- args: ["${hbase.provider.impl}"]
+ - name: "withClientFactory"
+ args: [ref: "hbaseClientFactory"]
+ - name: "withConnectionFactory"
+ args: ["${profiler.hbase.connection.factory}"]
- name: "withBatchSize"
args: [${profiler.hbase.batch}]
- name: "withFlushIntervalSecs"
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index 07bd552..d5c2949 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -20,18 +20,15 @@
package org.apache.metron.hbase.bolt;
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Durability;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
import org.apache.metron.hbase.ColumnList;
import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
import org.apache.storm.Config;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
@@ -41,6 +38,10 @@ import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.Optional;
+
/**
* A bolt that writes to HBase.
*
@@ -76,25 +77,32 @@ public class HBaseBolt extends BaseRichBolt {
protected HBaseMapper mapper;
/**
- * The name of the class that should be used as a table provider.
- *
- * <p>Defaults to 'org.apache.metron.hbase.HTableProvider'.
+ * Defines when a batch needs flushed.
*/
- protected String tableProviderClazzName = "org.apache.metron.hbase.HTableProvider";
+ private BatchHelper batchHelper;
/**
- * The TableProvider
- * May be loaded from tableProviderClazzName or provided
+ * Establishes a connection to HBase.
*/
- protected TableProvider tableProvider;
+ private HBaseConnectionFactory connectionFactory;
+
+ /**
+ * Creates the {@link HBaseTableClient} used by this bolt.
+ */
+ private HBaseClientFactory hBaseClientFactory;
+
+ /**
+ * Used to write to HBase.
+ */
+ protected transient HBaseClient hbaseClient;
- private BatchHelper batchHelper;
protected OutputCollector collector;
- protected transient LegacyHBaseClient hbaseClient;
public HBaseBolt(String tableName, HBaseMapper mapper) {
this.tableName = tableName;
this.mapper = mapper;
+ this.connectionFactory = new HBaseConnectionFactory();
+ this.hBaseClientFactory = new HBaseTableClientFactory();
}
public HBaseBolt writeToWAL(boolean writeToWAL) {
@@ -102,34 +110,34 @@ public class HBaseBolt extends BaseRichBolt {
return this;
}
- public HBaseBolt withTableProvider(String tableProvider) {
- this.tableProviderClazzName = tableProvider;
+ public HBaseBolt withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
return this;
}
- public HBaseBolt withTableProviderInstance(TableProvider tableProvider){
- this.tableProvider = tableProvider;
+ public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
+ this.flushIntervalSecs = flushIntervalSecs;
return this;
}
- public HBaseBolt withBatchSize(int batchSize) {
- this.batchSize = batchSize;
+ public HBaseBolt withConnectionFactory(HBaseConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
return this;
}
- public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
- this.flushIntervalSecs = flushIntervalSecs;
+ public HBaseBolt withConnectionFactory(String connectionFactoryImpl) {
+ this.connectionFactory = HBaseConnectionFactory.byName(connectionFactoryImpl);
return this;
}
- public void setClient(LegacyHBaseClient hbaseClient) {
- this.hbaseClient = hbaseClient;
+ public HBaseBolt withClientFactory(HBaseClientFactory clientFactory) {
+ this.hBaseClientFactory = clientFactory;
+ return this;
}
@Override
public Map<String, Object> getComponentConfiguration() {
LOG.debug("Tick tuples expected every {} second(s)", flushIntervalSecs);
-
Config conf = new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
return conf;
@@ -139,15 +147,7 @@ public class HBaseBolt extends BaseRichBolt {
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
this.batchHelper = new BatchHelper(batchSize, collector);
-
- TableProvider provider;
- if(this.tableProvider == null) {
- provider = createTableProvider(tableProviderClazzName);
- } else {
- provider = this.tableProvider;
- }
-
- hbaseClient = new LegacyHBaseClient(provider, HBaseConfiguration.create(), tableName);
+ this.hbaseClient = hBaseClientFactory.create(connectionFactory, HBaseConfiguration.create(), tableName);
}
@Override
@@ -158,12 +158,10 @@ public class HBaseBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
LOG.trace("Received a tuple.");
-
try {
if (batchHelper.shouldHandle(tuple)) {
save(tuple);
}
-
if (batchHelper.shouldFlush()) {
flush();
}
@@ -182,7 +180,6 @@ public class HBaseBolt extends BaseRichBolt {
byte[] rowKey = mapper.rowKey(tuple);
ColumnList cols = mapper.columns(tuple);
Durability durability = writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL;
-
Optional<Long> ttl = mapper.getTTL(tuple);
if(ttl.isPresent()) {
hbaseClient.addMutation(rowKey, cols, durability, ttl.get());
@@ -199,31 +196,7 @@ public class HBaseBolt extends BaseRichBolt {
*/
private void flush() {
LOG.debug("About to flush a batch of {} mutation(s)", batchHelper.getBatchSize());
-
this.hbaseClient.mutate();
batchHelper.ack();
}
-
- /**
- * Creates a TableProvider based on a class name.
- * @param connectorImpl The class name of a TableProvider
- */
- private static TableProvider createTableProvider(String connectorImpl) {
- LOG.trace("Creating table provider; className={}", connectorImpl);
-
- // if class name not defined, use a reasonable default
- if(StringUtils.isEmpty(connectorImpl) || connectorImpl.charAt(0) == '$') {
- return new HTableProvider();
- }
-
- // instantiate the table provider
- try {
- Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
- return clazz.getConstructor().newInstance();
-
- } catch (InstantiationException | IllegalAccessException | IllegalStateException |
- InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
- throw new IllegalStateException("Unable to instantiate connector", e);
- }
- }
}
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index 9146aff..b5bf898 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -20,13 +20,10 @@
package org.apache.metron.hbase.bolt;
-import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.test.bolt.BaseBoltTest;
import org.apache.storm.Constants;
import org.apache.storm.tuple.Tuple;
-import org.apache.metron.hbase.bolt.mapper.Widget;
-import org.apache.metron.hbase.bolt.mapper.WidgetMapper;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
-import org.apache.metron.test.bolt.BaseBoltTest;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -48,12 +45,11 @@ import static org.mockito.Mockito.when;
public class HBaseBoltTest extends BaseBoltTest {
private static final String tableName = "widgets";
- private LegacyHBaseClient client;
+ private HBaseClient client;
private Tuple tuple1;
private Tuple tuple2;
private Widget widget1;
private Widget widget2;
- private TableProvider provider;
@Before
public void setupTuples() throws Exception {
@@ -68,11 +64,10 @@ public class HBaseBoltTest extends BaseBoltTest {
}
@Before
- public void setup() throws Exception {
+ public void setup() {
tuple1 = mock(Tuple.class);
tuple2 = mock(Tuple.class);
- client = mock(LegacyHBaseClient.class);
- provider = mock(TableProvider.class);
+ client = mock(HBaseClient.class);
}
/**
@@ -80,9 +75,9 @@ public class HBaseBoltTest extends BaseBoltTest {
*/
private HBaseBolt createBolt(int batchSize, WidgetMapper mapper) throws IOException {
HBaseBolt bolt = new HBaseBolt(tableName, mapper)
- .withBatchSize(batchSize).withTableProviderInstance(provider);
+ .withBatchSize(batchSize)
+ .withClientFactory((f, c, t) -> client);
bolt.prepare(Collections.emptyMap(), topologyContext, outputCollector);
- bolt.setClient(client);
return bolt;
}
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/Widget.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/Widget.java
new file mode 100644
index 0000000..e25d4d5
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/Widget.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt;
+
+/**
+ * A simple POJO used for testing.
+ */
+public class Widget {
+
+ /**
+ * The name of the widget.
+ */
+ private String name;
+
+ /**
+ * The cost of the widget.
+ */
+ private int cost;
+
+ public Widget(String name, int cost) {
+ this.name = name;
+ this.cost = cost;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getCost() {
+ return cost;
+ }
+
+ public void setCost(int cost) {
+ this.cost = cost;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Widget widget = (Widget) o;
+ if (cost != widget.cost) return false;
+ return name != null ? name.equals(widget.name) : widget.name == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + cost;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Widget{" +
+ "name='" + name + '\'' +
+ ", cost=" + cost +
+ '}';
+ }
+}
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/WidgetMapper.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/WidgetMapper.java
new file mode 100644
index 0000000..ac6b561
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/WidgetMapper.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt;
+
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
+import org.apache.storm.tuple.Tuple;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+
+import java.util.Optional;
+
+
+/**
+ * Maps a Widget to HBase. Used only for testing.
+ */
+public class WidgetMapper implements HBaseMapper {
+
+ private Optional<Long> ttl;
+
+ public WidgetMapper() {
+ this.ttl = Optional.empty();
+ }
+
+ public WidgetMapper(Long ttl) {
+ this.ttl = Optional.of(ttl);
+ }
+
+ @Override
+ public byte[] rowKey(Tuple tuple) {
+ Widget w = (Widget) tuple.getValueByField("widget");
+ return Bytes.toBytes(w.getName());
+ }
+
+ @Override
+ public ColumnList columns(Tuple tuple) {
+ Widget w = (Widget) tuple.getValueByField("widget");
+
+ ColumnList cols = new ColumnList();
+ cols.addColumn(CF, QNAME, Bytes.toBytes(w.getName()));
+ cols.addColumn(CF, QCOST, Bytes.toBytes(w.getCost()));
+ return cols;
+ }
+
+ @Override
+ public Optional<Long> getTTL(Tuple tuple) {
+ return ttl;
+ }
+
+ public static final String CF_STRING = "cfWidget";
+ public static final byte[] CF = Bytes.toBytes(CF_STRING);
+ public static final byte[] QNAME = Bytes.toBytes("qName");
+ public static final byte[] QCOST = Bytes.toBytes("qCost");
+}
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index ea4ad4e..a1bd458 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -24,14 +24,16 @@ import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.io.FileUtils;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
import org.apache.metron.integration.BaseIntegrationTest;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.FluxTopologyComponent;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.profiler.client.HBaseProfilerClientFactory;
import org.apache.metron.profiler.client.stellar.FixedLookback;
import org.apache.metron.profiler.client.stellar.GetProfile;
import org.apache.metron.profiler.client.stellar.WindowLookback;
@@ -39,6 +41,7 @@ import org.apache.metron.statistics.OnlineStatisticsProvider;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
import org.apache.storm.Config;
import org.json.simple.JSONObject;
@@ -64,7 +67,6 @@ import java.util.concurrent.TimeUnit;
import static org.apache.metron.integration.utils.TestUtils.assertEventually;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
@@ -109,7 +111,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static KafkaComponent kafkaComponent;
private static ConfigUploadComponent configUploadComponent;
private static ComponentRunner runner;
- private static MockHTable profilerTable;
private static String message1;
private static String message2;
private static String message3;
@@ -393,7 +394,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@BeforeClass
public static void setupBeforeClass() throws UnableToStartException {
-
// create some messages that contain a timestamp - a really old timestamp; close to 1970
message1 = getMessage(entity, startAt);
message2 = getMessage(entity, startAt + 100);
@@ -428,7 +428,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
setProperty("profiler.hbase.column.family", columnFamily);
setProperty("profiler.hbase.batch", "10");
setProperty("profiler.hbase.flush.interval.seconds", "1");
- setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName());
+ setProperty("profiler.hbase.connection.factory", FakeHBaseConnectionFactory.class.getName());
+ setProperty("profiler.hbase.client.factory", FakeHBaseClientFactory.class.getName());
// profile settings
setProperty("profiler.period.duration", Long.toString(periodDurationMillis));
@@ -442,9 +443,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt));
}};
- // create the mock table
- profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
zkComponent = getZKServerComponent(topologyProperties);
// create the input and output topics
@@ -477,8 +475,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
}
@AfterClass
- public static void tearDownAfterClass() throws Exception {
- MockHBaseTableProvider.clear();
+ public static void tearDownAfterClass() {
if (runner != null) {
runner.stop();
}
@@ -486,14 +483,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Before
public void setup() {
- // create the mock table
- profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
// global properties
Map<String, Object> global = new HashMap<String, Object>() {{
put(PROFILER_HBASE_TABLE.getKey(), tableName);
put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
- put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
// client needs to use the same period duration
put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis));
@@ -503,21 +496,28 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor);
}};
+ Context context = new Context.Builder()
+ .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+ .build();
+
+ // create the GET_PROFILE function
+ GetProfile getProfileFunction = new GetProfile(new HBaseProfilerClientFactory(new FakeHBaseClientFactory()));
+
+ // ensure the functions that we need can be resolved
+ FunctionResolver functionResolver = new SimpleFunctionResolver()
+ .withClass(FixedLookback.class)
+ .withClass(WindowLookback.class)
+ .withInstance(getProfileFunction);
+
// create the stellar execution environment
- executor = new DefaultStellarStatefulExecutor(
- new SimpleFunctionResolver()
- .withClass(GetProfile.class)
- .withClass(FixedLookback.class)
- .withClass(WindowLookback.class),
- new Context.Builder()
- .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
- .build());
+ executor = new DefaultStellarStatefulExecutor(functionResolver, context);
+
+ // ensure that all HBase "records" are cleared before starting the test
+ new FakeHBaseClient().deleteAll();
}
@After
public void tearDown() throws Exception {
- MockHBaseTableProvider.clear();
- profilerTable.clear();
if (runner != null) {
runner.reset();
}
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
index d8bc13d..d89dcab 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
@@ -61,6 +61,8 @@ profiler.hbase.table={{profiler_hbase_table}}
profiler.hbase.column.family={{profiler_hbase_cf}}
profiler.hbase.batch={{profiler_hbase_batch}}
profiler.hbase.flush.interval.seconds={{profiler_hbase_flush_interval}}
+profiler.hbase.client.factory=org.apache.metron.hbase.client.HBaseTableClientFactory
+profiler.hbase.connection.factory=org.apache.metron.hbase.client.HBaseConnectionFactory
##### Kafka #####
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
index 60d2328..4591f9e 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
@@ -82,7 +82,7 @@ public class HBaseTableClient implements HBaseClient {
connection.close();
}
} catch(IOException e) {
- LOG.error("Error while closing HBase connection",e);
+ LOG.error("Error while closing HBase connection", e);
}
}
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java
index 8606723..042706d 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java
@@ -77,6 +77,11 @@ public class StellarFunctionInfo {
return function;
}
+ public StellarFunctionInfo setFunction(StellarFunction function) {
+ this.function = function;
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -90,7 +95,6 @@ public class StellarFunctionInfo {
// Probably incorrect - comparing Object[] arrays with Arrays.equals
if (!Arrays.equals(params, that.params)) return false;
return function != null ? function.equals(that.function) : that.function == null;
-
}
@Override
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java
index 38a32d1..6fd669b 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java
@@ -52,7 +52,7 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
/**
* Maps a function name to the metadata necessary to execute the Stellar function.
*/
- private Supplier<Map<String, StellarFunctionInfo>> functions;
+ protected Supplier<Map<String, StellarFunctionInfo>> functions;
/**
* The Stellar execution context that can be used to inform the function resolution process.
@@ -145,6 +145,7 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
*/
@Override
public StellarFunction apply(String functionName) {
+ LOG.debug("Resolving function; functionName={}", functionName);
StellarFunctionInfo info = functions.get().get(functionName);
if(info == null) {
throw new IllegalStateException(format("Unknown function: `%s`", functionName));
@@ -156,7 +157,6 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
* Performs the core process of function resolution.
*/
protected Map<String, StellarFunctionInfo> resolveFunctions() {
-
// maps a function name to its definition
Map<String, StellarFunctionInfo> functions = new HashMap<>();
@@ -242,4 +242,18 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
return null;
}
}
+
+ @Override
+ public BaseFunctionResolver withInstance(StellarFunction function) {
+ // perform function resolution on the instance that was passed in
+ StellarFunctionInfo functionInfo = resolveFunction(function.getClass());
+ functionInfo.setFunction(function);
+
+ // add the function to the set of resolvable functions
+ Map<String, StellarFunctionInfo> currentFunctions = this.functions.get();
+ currentFunctions.put(functionInfo.getName(), functionInfo);
+
+ this.functions = Suppliers.memoize(() -> currentFunctions);
+ return this;
+ }
}
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java
index 4047586..38eb067 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java
@@ -17,13 +17,14 @@
*/
package org.apache.metron.stellar.dsl.functions.resolver;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Function;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.StellarFunction;
import org.apache.metron.stellar.dsl.StellarFunctionInfo;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Function;
+
/**
* Responsible for function resolution in Stellar.
*/
@@ -46,6 +47,16 @@ public interface FunctionResolver extends Function<String, StellarFunction>, Clo
void initialize(Context context);
/**
+ * Attempts to resolve a function defined within the provided {@link StellarFunction}
+ * instance.
+ *
+ * <p>This can be useful for instrumenting a Stellar function before it is tested.
+ *
+ * @param function The Stellar function to resolve.
+ */
+ FunctionResolver withInstance(StellarFunction function);
+
+ /**
* Perform any cleanup necessary for the loaded Stellar functions.
*/
@Override
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java
index d2d0e62..a5a5b0c 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java
@@ -18,13 +18,17 @@
package org.apache.metron.stellar.dsl.functions.resolver;
-import java.lang.invoke.MethodHandles;
-import java.util.HashSet;
-import java.util.Set;
+import com.google.common.base.Suppliers;
import org.apache.metron.stellar.dsl.StellarFunction;
+import org.apache.metron.stellar.dsl.StellarFunctionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
/**
* A simple Stellar function resolver that resolves functions from specific
* classes rather than by searching the classpath.
@@ -49,7 +53,8 @@ public class SimpleFunctionResolver extends BaseFunctionResolver {
}
/**
- * Will attempt to resolve any Stellar functions defined within the specified class.
+ * Attempts to resolve any functions defined within a specific class.
+ *
* @param clazz The class which may contain a Stellar function.
*/
public SimpleFunctionResolver withClass(Class<? extends StellarFunction> clazz) {