You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 14:59:48 UTC
[18/52] [abbrv] metron git commit: METRON-590 Enable Use of Event
Time in Profiler (nickwallen) closes apache/metron#965
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index ceb9e4e..ccce022 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -287,6 +287,8 @@ profiler_input_topic = config['configurations']['metron-enrichment-env']['enrich
profiler_kafka_start = config['configurations']['metron-profiler-env']['profiler_kafka_start']
profiler_period_duration = config['configurations']['metron-profiler-env']['profiler_period_duration']
profiler_period_units = config['configurations']['metron-profiler-env']['profiler_period_units']
+profiler_window_duration = config['configurations']['metron-profiler-env']['profiler_window_duration']
+profiler_window_units = config['configurations']['metron-profiler-env']['profiler_window_units']
profiler_ttl = config['configurations']['metron-profiler-env']['profiler_ttl']
profiler_ttl_units = config['configurations']['metron-profiler-env']['profiler_ttl_units']
profiler_hbase_batch = config['configurations']['metron-profiler-env']['profiler_hbase_batch']
@@ -302,6 +304,11 @@ profiler_hbase_acl_configured_flag_file = status_params.profiler_hbase_acl_confi
if not len(profiler_topology_worker_childopts) == 0:
profiler_topology_worker_childopts += ' '
profiler_topology_worker_childopts += config['configurations']['metron-profiler-env']['profiler_topology_worker_childopts']
+profiler_max_routes_per_bolt=config['configurations']['metron-profiler-env']['profiler_max_routes_per_bolt']
+profiler_window_lag=config['configurations']['metron-profiler-env']['profiler_window_lag']
+profiler_window_lag_units=config['configurations']['metron-profiler-env']['profiler_window_lag_units']
+profiler_topology_message_timeout_secs=config['configurations']['metron-profiler-env']['profiler_topology_message_timeout_secs']
+profiler_topology_max_spout_pending=config['configurations']['metron-profiler-env']['profiler_topology_max_spout_pending']
# Indexing
ra_indexing_kafka_start = config['configurations']['metron-indexing-env']['ra_indexing_kafka_start']
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
index 06fd209..fabdaa7 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
@@ -22,6 +22,10 @@
topology.worker.childopts={{profiler_topology_worker_childopts}}
topology.auto-credentials={{topology_auto_credentials}}
+profiler.workers={{profiler_topology_workers}}
+profiler.executors={{profiler_acker_executors}}
+topology.message.timeout.secs={{profiler_topology_message_timeout_secs}}
+topology.max.spout.pending={{profiler_topology_max_spout_pending}}
##### Profiler #####
@@ -29,10 +33,16 @@ profiler.input.topic={{enrichment_output_topic}}
profiler.output.topic={{enrichment_input_topic}}
profiler.period.duration={{profiler_period_duration}}
profiler.period.duration.units={{profiler_period_units}}
-profiler.workers={{profiler_topology_workers}}
-profiler.executors={{profiler_acker_executors}}
+profiler.window.duration={{profiler_window_duration}}
+profiler.window.duration.units={{profiler_window_units}}
profiler.ttl={{profiler_ttl}}
profiler.ttl.units={{profiler_ttl_units}}
+profiler.window.lag={{profiler_window_lag}}
+profiler.window.lag.units={{profiler_window_lag_units}}
+profiler.max.routes.per.bolt={{profiler_max_routes_per_bolt}}
+
+##### HBase #####
+
profiler.hbase.salt.divisor=1000
profiler.hbase.table={{profiler_hbase_table}}
profiler.hbase.column.family={{profiler_hbase_cf}}
@@ -43,6 +53,5 @@ profiler.hbase.flush.interval.seconds={{profiler_hbase_flush_interval}}
kafka.zk={{zookeeper_quorum}}
kafka.broker={{kafka_brokers}}
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
kafka.start={{profiler_kafka_start}}
kafka.security.protocol={{kafka_security_protocol}}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index cef9a3b..234b551 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -221,8 +221,27 @@
"tab-rows": "3",
"sections": [
{
+ "name": "section-profiler-setup",
+ "row-index": "0",
+ "column-index": "0",
+ "row-span": "1",
+ "column-span": "1",
+ "section-columns": "1",
+ "section-rows": "1",
+ "subsections": [
+ {
+ "name": "subsection-profiler-setup",
+ "display-name": "Profiler Setup",
+ "row-index": "0",
+ "column-index": "0",
+ "row-span": "1",
+ "column-span": "1"
+ }
+ ]
+ },
+ {
"name": "section-profiler-kafka",
- "row-index": "0",
+ "row-index": "1",
"column-index": "0",
"row-span": "1",
"column-span": "1",
@@ -240,8 +259,8 @@
]
},
{
- "name": "section-profiler-setup",
- "row-index": "1",
+ "name": "section-profiler-storm",
+ "row-index": "2",
"column-index": "0",
"row-span": "1",
"column-span": "1",
@@ -249,8 +268,8 @@
"section-rows": "1",
"subsections": [
{
- "name": "subsection-profiler-setup",
- "display-name": "Profiler Setup",
+ "name": "subsection-profiler-storm",
+ "display-name": "Storm",
"row-index": "0",
"column-index": "0",
"row-span": "1",
@@ -259,8 +278,8 @@
]
},
{
- "name": "section-profiler-storm",
- "row-index": "2",
+ "name": "section-profiler-hbase",
+ "row-index": "3",
"column-index": "0",
"row-span": "1",
"column-span": "1",
@@ -268,8 +287,8 @@
"section-rows": "1",
"subsections": [
{
- "name": "subsection-profiler-storm",
- "display-name": "Storm",
+ "name": "subsection-profiler-hbase",
+ "display-name": "HBase",
"row-index": "0",
"column-index": "0",
"row-span": "1",
@@ -568,7 +587,6 @@
"config": "metron-indexing-env/bolt_hdfs_rotation_policy_count",
"subsection-name": "subsection-indexing-hdfs"
},
-
{
"config": "metron-profiler-env/profiler_kafka_start",
"subsection-name": "subsection-profiler-kafka"
@@ -582,6 +600,14 @@
"subsection-name": "subsection-profiler-setup"
},
{
+ "config": "metron-profiler-env/profiler_window_duration",
+ "subsection-name": "subsection-profiler-setup"
+ },
+ {
+ "config": "metron-profiler-env/profiler_window_units",
+ "subsection-name": "subsection-profiler-setup"
+ },
+ {
"config": "metron-profiler-env/profiler_ttl",
"subsection-name": "subsection-profiler-setup"
},
@@ -590,20 +616,32 @@
"subsection-name": "subsection-profiler-setup"
},
{
- "config": "metron-profiler-env/profiler_hbase_table",
+ "config": "metron-profiler-env/profiler_window_lag",
"subsection-name": "subsection-profiler-setup"
},
{
- "config": "metron-profiler-env/profiler_hbase_cf",
+ "config": "metron-profiler-env/profiler_window_lag_units",
"subsection-name": "subsection-profiler-setup"
},
{
- "config": "metron-profiler-env/profiler_hbase_batch",
+ "config": "metron-profiler-env/profiler_max_routes_per_bolt",
"subsection-name": "subsection-profiler-setup"
},
{
+ "config": "metron-profiler-env/profiler_hbase_table",
+ "subsection-name": "subsection-profiler-hbase"
+ },
+ {
+ "config": "metron-profiler-env/profiler_hbase_cf",
+ "subsection-name": "subsection-profiler-hbase"
+ },
+ {
+ "config": "metron-profiler-env/profiler_hbase_batch",
+ "subsection-name": "subsection-profiler-hbase"
+ },
+ {
"config": "metron-profiler-env/profiler_hbase_flush_interval",
- "subsection-name": "subsection-profiler-setup"
+ "subsection-name": "subsection-profiler-hbase"
},
{
"config": "metron-profiler-env/profiler_topology_worker_childopts",
@@ -618,6 +656,14 @@
"subsection-name": "subsection-profiler-storm"
},
{
+ "config": "metron-profiler-env/profiler_topology_message_timeout_secs",
+ "subsection-name": "subsection-profiler-storm"
+ },
+ {
+ "config": "metron-profiler-env/profiler_topology_max_spout_pending",
+ "subsection-name": "subsection-profiler-storm"
+ },
+ {
"config": "metron-rest-env/metron_rest_port",
"subsection-name": "subsection-rest"
},
@@ -905,7 +951,6 @@
"type": "text-field"
}
},
-
{
"config": "metron-indexing-env/batch_indexing_acker_executors",
"widget": {
@@ -1004,6 +1049,18 @@
}
},
{
+ "config": "metron-profiler-env/profiler_window_duration",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
+ "config": "metron-profiler-env/profiler_window_units",
+ "widget": {
+ "type": "combo"
+ }
+ },
+ {
"config": "metron-profiler-env/profiler_ttl",
"widget": {
"type": "text-field"
@@ -1016,6 +1073,24 @@
}
},
{
+ "config": "metron-profiler-env/profiler_max_routes_per_bolt",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
+ "config": "metron-profiler-env/profiler_window_lag",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
+ "config": "metron-profiler-env/profiler_window_lag_units",
+ "widget": {
+ "type": "combo"
+ }
+ },
+ {
"config": "metron-profiler-env/profiler_hbase_table",
"widget": {
"type": "text-field"
@@ -1057,7 +1132,18 @@
"type": "text-field"
}
},
-
+ {
+ "config": "metron-profiler-env/profiler_topology_max_spout_pending",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
+ "config": "metron-profiler-env/profiler_topology_message_timeout_secs",
+ "widget": {
+ "type": "text-field"
+ }
+ },
{
"config": "metron-rest-env/metron_rest_port",
"widget": {
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index 06c82d2..6205fbf 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -89,6 +89,9 @@ public class ProfileConfig implements Serializable {
*/
private Long expires;
+ public ProfileConfig() {
+ }
+
/**
* A profile definition requires at the very least the profile name, the foreach, and result
* expressions.
@@ -114,6 +117,11 @@ public class ProfileConfig implements Serializable {
this.profile = profile;
}
+ public ProfileConfig withProfile(String profile) {
+ this.profile = profile;
+ return this;
+ }
+
public String getForeach() {
return foreach;
}
@@ -122,6 +130,11 @@ public class ProfileConfig implements Serializable {
this.foreach = foreach;
}
+ public ProfileConfig withForeach(String foreach) {
+ this.foreach = foreach;
+ return this;
+ }
+
public String getOnlyif() {
return onlyif;
}
@@ -130,6 +143,11 @@ public class ProfileConfig implements Serializable {
this.onlyif = onlyif;
}
+ public ProfileConfig withOnlyif(String onlyif) {
+ this.onlyif = onlyif;
+ return this;
+ }
+
public Map<String, String> getInit() {
return init;
}
@@ -138,6 +156,16 @@ public class ProfileConfig implements Serializable {
this.init = init;
}
+ public ProfileConfig withInit(Map<String, String> init) {
+ this.init.putAll(init);
+ return this;
+ }
+
+ public ProfileConfig withInit(String var, String expression) {
+ this.init.put(var, expression);
+ return this;
+ }
+
public Map<String, String> getUpdate() {
return update;
}
@@ -146,6 +174,16 @@ public class ProfileConfig implements Serializable {
this.update = update;
}
+ public ProfileConfig withUpdate(Map<String, String> update) {
+ this.update.putAll(update);
+ return this;
+ }
+
+ public ProfileConfig withUpdate(String var, String expression) {
+ this.update.put(var, expression);
+ return this;
+ }
+
public List<String> getGroupBy() {
return groupBy;
}
@@ -154,6 +192,11 @@ public class ProfileConfig implements Serializable {
this.groupBy = groupBy;
}
+ public ProfileConfig withGroupBy(List<String> groupBy) {
+ this.groupBy = groupBy;
+ return this;
+ }
+
public ProfileResult getResult() {
return result;
}
@@ -162,6 +205,11 @@ public class ProfileConfig implements Serializable {
this.result = result;
}
+ public ProfileConfig withResult(String profileExpression) {
+ this.result = new ProfileResult(profileExpression);
+ return this;
+ }
+
public Long getExpires() {
return expires;
}
@@ -170,6 +218,11 @@ public class ProfileConfig implements Serializable {
this.expires = expiresDays;
}
+ public ProfileConfig withExpires(Long expiresDays) {
+ this.expires = TimeUnit.DAYS.toMillis(expiresDays);
+ return this;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index e7c081a..0bdb7e2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -20,9 +20,10 @@ package org.apache.metron.common.configuration.profiler;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
/**
- * The definition for entire Profiler, which may contain many Profile definitions.
+ * The configuration object for the Profiler, which may contain many Profile definitions.
*/
public class ProfilerConfig implements Serializable {
@@ -31,6 +32,20 @@ public class ProfilerConfig implements Serializable {
*/
private List<ProfileConfig> profiles = new ArrayList<>();
+ /**
+ * The name of a field containing the timestamp that is used to
+ * generate profiles.
+ *
+ * <p>By default, the processing time of the Profiler is used rather
+ * than event time; a value contained within the message itself.
+ *
+ * <p>The field must contain a timestamp in epoch milliseconds.
+ *
+ * <p>If a message does NOT contain this field, it will be dropped
+ * and not included in any profiles.
+ */
+ private Optional<String> timestampField = Optional.empty();
+
public List<ProfileConfig> getProfiles() {
return profiles;
}
@@ -39,10 +54,33 @@ public class ProfilerConfig implements Serializable {
this.profiles = profiles;
}
+ public ProfilerConfig withProfile(ProfileConfig profileConfig) {
+ this.profiles.add(profileConfig);
+ return this;
+ }
+
+ public Optional<String> getTimestampField() {
+ return timestampField;
+ }
+
+ public void setTimestampField(String timestampField) {
+ this.timestampField = Optional.of(timestampField);
+ }
+
+ public void setTimestampField(Optional<String> timestampField) {
+ this.timestampField = timestampField;
+ }
+
+ public ProfilerConfig withTimestampField(Optional<String> timestampField) {
+ this.timestampField = timestampField;
+ return this;
+ }
+
@Override
public String toString() {
return "ProfilerConfig{" +
"profiles=" + profiles +
+ ", timestampField='" + timestampField + '\'' +
'}';
}
@@ -50,13 +88,15 @@ public class ProfilerConfig implements Serializable {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
ProfilerConfig that = (ProfilerConfig) o;
- return profiles != null ? profiles.equals(that.profiles) : that.profiles == null;
+ if (profiles != null ? !profiles.equals(that.profiles) : that.profiles != null) return false;
+ return timestampField != null ? timestampField.equals(that.timestampField) : that.timestampField == null;
}
@Override
public int hashCode() {
- return profiles != null ? profiles.hashCode() : 0;
+ int result = profiles != null ? profiles.hashCode() : 0;
+ result = 31 * result + (timestampField != null ? timestampField.hashCode() : 0);
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
index c02f19d..02e6015 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
@@ -24,6 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.flipkart.zjsonpatch.JsonPatch;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -31,17 +35,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
-import com.google.common.reflect.TypeToken;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-
public enum JSONUtils {
INSTANCE;
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
index 68c5203..4976d30 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.function.Supplier;
public class ProfilerUpdater extends ConfigurationsUpdater<ProfilerConfigurations> {
+
public ProfilerUpdater(Reloadable reloadable, Supplier<ProfilerConfigurations> configSupplier) {
super(reloadable, configSupplier);
}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
index a0e115d..e178ee0 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
@@ -27,10 +27,11 @@ import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
/**
- * Ensures that Profile definitions have the expected defaults
+ * Tests the {@link ProfileConfig} class.
+ *
+ * Ensures that profile definitions have the expected defaults
* and can be (de)serialized to and from JSON.
*/
public class ProfileConfigTest {
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
new file mode 100644
index 0000000..2e73cde
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.common.configuration.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ProfilerConfig} class.
+ */
+public class ProfilerConfigTest {
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile1",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": "0" },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private String noTimestampField;
+
+ /**
+ * If no 'timestampField' is defined, it should not be present by default.
+ */
+ @Test
+ public void testNoTimestampField() throws IOException {
+ ProfilerConfig conf = JSONUtils.INSTANCE.load(noTimestampField, ProfilerConfig.class);
+ assertFalse(conf.getTimestampField().isPresent());
+ }
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile1",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": "0" },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ * ],
+ * "timestampField": "timestamp"
+ * }
+ */
+ @Multiline
+ private String timestampField;
+
+ /**
+ * If no 'timestampField' is defined, it should not be present by default.
+ */
+ @Test
+ public void testTimestampField() throws IOException {
+ ProfilerConfig conf = JSONUtils.INSTANCE.load(timestampField, ProfilerConfig.class);
+ assertTrue(conf.getTimestampField().isPresent());
+ }
+
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile1",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": "0" },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * },
+ * {
+ * "profile": "profile2",
+ * "foreach": "ip_dst_addr",
+ * "init": { "count": "0" },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private String twoProfiles;
+
+ /**
+ * The 'onlyif' field should default to 'true' when it is not specified.
+ */
+ @Test
+ public void testTwoProfiles() throws IOException {
+ ProfilerConfig conf = JSONUtils.INSTANCE.load(twoProfiles, ProfilerConfig.class);
+ assertEquals(2, conf.getProfiles().size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 9d8c57e..08910be 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.common.TopicExistsException;
@@ -48,6 +51,7 @@ import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.metron.integration.InMemoryComponent;
@@ -314,11 +318,44 @@ public class KafkaComponent implements InMemoryComponent {
}
}
+ /**
+ * Write a collection of messages to a Kafka topic.
+ *
+ * @param topic The name of the Kafka topic.
+ * @param messages The collection of messages to write.
+ */
public void writeMessages(String topic, Collection<byte[]> messages) {
try(KafkaProducer<String, byte[]> kafkaProducer = createProducer()) {
for (byte[] message : messages) {
- kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));
+ kafkaProducer.send(new ProducerRecord<>(topic, message));
}
}
}
+
+ /**
+ * Write messages to a Kafka topic.
+ *
+ * @param topic The name of the Kafka topic.
+ * @param messages The messages to write.
+ */
+ public void writeMessages(String topic, String ...messages) {
+
+ // convert each message to raw bytes
+ List<byte[]> messagesAsBytes = Stream.of(messages)
+ .map(Bytes::toBytes)
+ .collect(Collectors.toList());
+
+ writeMessages(topic, messagesAsBytes);
+ }
+
+ /**
+ * Write messages to a Kafka topic.
+ *
+ * @param topic The name of the Kafka topic.
+ * @param messages The messages to write.
+ */
+ public void writeMessages(String topic, List<String> messages) {
+
+ writeMessages(topic, messages.toArray(new String[] {}));
+ }
}