You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/10/08 22:32:53 UTC
metron git commit: METRON-1792 Simplify Profile Definitions in
Integration Tests (nickwallen) closes apache/metron#1211
Repository: metron
Updated Branches:
refs/heads/master 269b91d01 -> 5bfc08c57
METRON-1792 Simplify Profile Definitions in Integration Tests (nickwallen) closes apache/metron#1211
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5bfc08c5
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5bfc08c5
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5bfc08c5
Branch: refs/heads/master
Commit: 5bfc08c57f1129b7d185ac7257197775ed3bdb5e
Parents: 269b91d
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Oct 8 18:32:30 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon Oct 8 18:32:30 2018 -0400
----------------------------------------------------------------------
.../zookeeper/event-time-test/profiler.json | 19 ----
.../processing-time-test/profiler.json | 11 --
.../zookeeper/profile-with-stats/profiler.json | 12 --
.../integration/ConfigUploadComponent.java | 31 ++++--
.../integration/ProfilerIntegrationTest.java | 109 ++++++++++++++++---
.../ZKConfigurationsCacheIntegrationTest.java | 5 +-
.../src/test/resources/profiler/profiler.json | 19 ++++
7 files changed, 137 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
deleted file mode 100644
index 534b7c6..0000000
--- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/event-time-test/profiler.json
+++ /dev/null
@@ -1,19 +0,0 @@
-{
- "timestampField": "timestamp",
- "profiles": [
- {
- "profile": "count-by-ip",
- "foreach": "ip_src_addr",
- "init": { "count": 0 },
- "update": { "count" : "count + 1" },
- "result": "count"
- },
- {
- "profile": "total-count",
- "foreach": "'total'",
- "init": { "count": 0 },
- "update": { "count": "count + 1" },
- "result": "count"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
deleted file mode 100644
index e75ec0f..0000000
--- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/processing-time-test/profiler.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "profiles": [
- {
- "profile": "processing-time-test",
- "foreach": "ip_src_addr",
- "init": { "counter": "0" },
- "update": { "counter": "counter + 1" },
- "result": "counter"
- }
- ]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json b/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
deleted file mode 100644
index 083e73f..0000000
--- a/metron-analytics/metron-profiler-storm/src/test/config/zookeeper/profile-with-stats/profiler.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "profiles": [
- {
- "profile": "profile-with-stats",
- "foreach": "'global'",
- "init": { "stats": "STATS_INIT()" },
- "update": { "stats": "STATS_ADD(stats, 1)" },
- "result": "stats"
- }
- ],
- "timestampField": "timestamp"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
index 70487a0..eae3c52 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ConfigUploadComponent.java
@@ -19,12 +19,15 @@
*/
package org.apache.metron.profiler.storm.integration;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.ZKServerComponent;
+import java.util.Arrays;
import java.util.Properties;
import static org.apache.metron.common.configuration.ConfigurationsUtils.getClient;
@@ -41,7 +44,8 @@ public class ConfigUploadComponent implements InMemoryComponent {
private Properties topologyProperties;
private String globalConfiguration;
- private String profilerConfiguration;
+ private String profilerConfigurationPath;
+ private ProfilerConfig profilerConfig;
@Override
public void start() throws UnableToStartException {
@@ -86,11 +90,17 @@ public class ConfigUploadComponent implements InMemoryComponent {
* @param client The zookeeper client.
*/
private void uploadProfilerConfig(CuratorFramework client) throws Exception {
- if (profilerConfiguration != null) {
- byte[] globalConfig = readProfilerConfigFromFile(profilerConfiguration);
- if (globalConfig.length > 0) {
- writeProfilerConfigToZookeeper(readProfilerConfigFromFile(profilerConfiguration), client);
- }
+ byte[] configBytes = null;
+
+ if (profilerConfigurationPath != null) {
+ configBytes = readProfilerConfigFromFile(profilerConfigurationPath);
+
+ } else if(profilerConfig != null) {
+ configBytes = profilerConfig.toJSON().getBytes();
+ }
+
+ if (ArrayUtils.getLength(configBytes) > 0) {
+ writeProfilerConfigToZookeeper(configBytes, client);
}
}
@@ -117,8 +127,13 @@ public class ConfigUploadComponent implements InMemoryComponent {
return this;
}
- public ConfigUploadComponent withProfilerConfiguration(String path) {
- this.profilerConfiguration = path;
+ public ConfigUploadComponent withProfilerConfigurationPath(String path) {
+ this.profilerConfigurationPath = path;
+ return this;
+ }
+
+ public ConfigUploadComponent withProfilerConfiguration(ProfilerConfig profilerConfig) {
+ this.profilerConfig = profilerConfig;
return this;
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index 64a1482..f7e75ce 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -23,6 +23,7 @@ package org.apache.metron.profiler.storm.integration;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.io.FileUtils;
import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.hbase.mock.MockHBaseTableProvider;
import org.apache.metron.hbase.mock.MockHTable;
import org.apache.metron.integration.BaseIntegrationTest;
@@ -90,18 +91,15 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
public static final long startAt = 10;
public static final String entity = "10.0.0.1";
-
private static final String tableName = "profiler";
private static final String columnFamily = "P";
private static final String inputTopic = Constants.INDEXING_TOPIC;
private static final String outputTopic = "profiles";
private static final int saltDivisor = 10;
-
private static final long periodDurationMillis = TimeUnit.SECONDS.toMillis(20);
private static final long windowLagMillis = TimeUnit.SECONDS.toMillis(10);
private static final long windowDurationMillis = TimeUnit.SECONDS.toMillis(10);
private static final long profileTimeToLiveMillis = TimeUnit.SECONDS.toMillis(20);
-
private static final long maxRoutesPerBolt = 100000;
private static ZKServerComponent zkComponent;
@@ -110,11 +108,9 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
private static ConfigUploadComponent configUploadComponent;
private static ComponentRunner runner;
private static MockHTable profilerTable;
-
private static String message1;
private static String message2;
private static String message3;
-
private StellarStatefulExecutor executor;
/**
@@ -135,9 +131,25 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Multiline
private static String kryoSerializers;
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "processing-time-test",
+ * "foreach": "ip_src_addr",
+ * "init": { "counter": "0" },
+ * "update": { "counter": "counter + 1" },
+ * "result": "counter"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private static String processingTimeProfile;
+
@Test
public void testProcessingTime() throws Exception {
- uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
+ uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile));
// start the topology and write 3 test messages to kafka
fluxComponent.submitTopology();
@@ -146,7 +158,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
kafkaComponent.writeMessages(inputTopic, message3);
// retrieve the profile measurement using PROFILE_GET
- String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+ String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))";
List<Integer> measurements = execute(profileGetExpression, List.class);
// need to keep checking for measurements until the profiler has flushed one out
@@ -178,7 +190,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
@Test
public void testProcessingTimeWithTimeToLiveFlush() throws Exception {
- uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/processing-time-test");
+ uploadConfigToZookeeper(ProfilerConfig.fromJSON(processingTimeProfile));
// start the topology and write 3 test messages to kafka
fluxComponent.submitTopology();
@@ -194,7 +206,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
kafkaComponent.writeMessages(inputTopic, message3);
// retrieve the profile measurement using PROFILE_GET
- String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('5', 'MINUTES'))";
+ String profileGetExpression = "PROFILE_GET('processing-time-test', '10.0.0.1', PROFILE_FIXED('15', 'MINUTES'))";
List<Integer> measurements = execute(profileGetExpression, List.class);
// need to keep checking for measurements until the profiler has flushed one out
@@ -221,9 +233,33 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
assertEquals(3, measurements.get(0).intValue());
}
+ /**
+ * {
+ * "timestampField": "timestamp",
+ * "profiles": [
+ * {
+ * "profile": "count-by-ip",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": 0 },
+ * "update": { "count" : "count + 1" },
+ * "result": "count"
+ * },
+ * {
+ * "profile": "total-count",
+ * "foreach": "'total'",
+ * "init": { "count": 0 },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private static String eventTimeProfile;
+
@Test
public void testEventTime() throws Exception {
- uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/event-time-test");
+ uploadConfigToZookeeper(ProfilerConfig.fromJSON(eventTimeProfile));
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
@@ -264,6 +300,23 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
}
/**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile-with-stats",
+ * "foreach": "'global'",
+ * "init": { "stats": "STATS_INIT()" },
+ * "update": { "stats": "STATS_ADD(stats, 1)" },
+ * "result": "stats"
+ * }
+ * ],
+ * "timestampField": "timestamp"
+ * }
+ */
+ @Multiline
+ private static String profileWithStats;
+
+ /**
* The result produced by a Profile has to be serializable within Storm. If the result is not
* serializable the topology will crash and burn.
*
@@ -272,7 +325,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
*/
@Test
public void testProfileWithStatsObject() throws Exception {
- uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/profile-with-stats");
+ uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithStats));
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
@@ -293,9 +346,34 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
assertTrue(results.get(0) instanceof OnlineStatisticsProvider);
}
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile-with-triage",
+ * "foreach": "'global'",
+ * "update": {
+ * "stats": "STATS_ADD(stats, 1)"
+ * },
+ * "result": {
+ * "profile": "stats",
+ * "triage": {
+ * "min": "STATS_MIN(stats)",
+ * "max": "STATS_MAX(stats)",
+ * "mean": "STATS_MEAN(stats)"
+ * }
+ * }
+ * }
+ * ],
+ * "timestampField": "timestamp"
+ * }
+ */
+ @Multiline
+ private static String profileWithTriageResult;
+
@Test
public void testProfileWithTriageResult() throws Exception {
- uploadConfigToZookeeper(TEST_RESOURCES + "/config/zookeeper/triage-result");
+ uploadConfigToZookeeper(ProfilerConfig.fromJSON(profileWithTriageResult));
// start the topology and write test messages to kafka
fluxComponent.submitTopology();
@@ -466,13 +544,12 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
/**
* Uploads config values to Zookeeper.
- * @param path The path on the local filesystem to the config values.
+ * @param profilerConfig The Profiler configuration.
* @throws Exception
*/
- public void uploadConfigToZookeeper(String path) throws Exception {
+ public void uploadConfigToZookeeper(ProfilerConfig profilerConfig) throws Exception {
configUploadComponent
- .withGlobalConfiguration(path)
- .withProfilerConfiguration(path)
+ .withProfilerConfiguration(profilerConfig)
.update();
}
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
index a7dc248..ce898d3 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java
@@ -119,7 +119,6 @@ public class ZKConfigurationsCacheIntegrationTest {
@Multiline
public static String globalConfig;
- public static File profilerDir = new File("../../metron-analytics/metron-profiler-storm/src/test/config/zookeeper");
public ConfigurationsCache cache;
public ZKServerComponent zkComponent;
@@ -154,7 +153,7 @@ public class ZKConfigurationsCacheIntegrationTest {
}
{
//profiler
- byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/event-time-test/profiler.json")));
+ byte[] config = IOUtils.toByteArray(new FileInputStream(new File("src/test/resources/profiler/profiler.json")));
ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client);
}
{
@@ -284,7 +283,7 @@ public class ZKConfigurationsCacheIntegrationTest {
}
//profiler
{
- File inFile = new File(profilerDir, "/event-time-test/profiler.json");
+ File inFile = new File("src/test/resources/profiler/profiler.json");
ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class);
ProfilerConfigurations config = cache.get( ProfilerConfigurations.class);
assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig()));
http://git-wip-us.apache.org/repos/asf/metron/blob/5bfc08c5/metron-platform/metron-common/src/test/resources/profiler/profiler.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/resources/profiler/profiler.json b/metron-platform/metron-common/src/test/resources/profiler/profiler.json
new file mode 100644
index 0000000..32be68d
--- /dev/null
+++ b/metron-platform/metron-common/src/test/resources/profiler/profiler.json
@@ -0,0 +1,19 @@
+{
+ "timestampField": "timestamp",
+ "profiles": [
+ {
+ "profile": "count-by-ip",
+ "foreach": "ip_src_addr",
+ "init": { "count": 0 },
+ "update": { "count" : "count + 1" },
+ "result": "count"
+ },
+ {
+ "profile": "total-count",
+ "foreach": "'total'",
+ "init": { "count": 0 },
+ "update": { "count": "count + 1" },
+ "result": "count"
+ }
+ ]
+}