You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/04/27 19:29:50 UTC

[10/50] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index ceb9e4e..ccce022 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -287,6 +287,8 @@ profiler_input_topic = config['configurations']['metron-enrichment-env']['enrich
 profiler_kafka_start = config['configurations']['metron-profiler-env']['profiler_kafka_start']
 profiler_period_duration = config['configurations']['metron-profiler-env']['profiler_period_duration']
 profiler_period_units = config['configurations']['metron-profiler-env']['profiler_period_units']
+profiler_window_duration = config['configurations']['metron-profiler-env']['profiler_window_duration']
+profiler_window_units = config['configurations']['metron-profiler-env']['profiler_window_units']
 profiler_ttl = config['configurations']['metron-profiler-env']['profiler_ttl']
 profiler_ttl_units = config['configurations']['metron-profiler-env']['profiler_ttl_units']
 profiler_hbase_batch = config['configurations']['metron-profiler-env']['profiler_hbase_batch']
@@ -302,6 +304,11 @@ profiler_hbase_acl_configured_flag_file = status_params.profiler_hbase_acl_confi
 if not len(profiler_topology_worker_childopts) == 0:
     profiler_topology_worker_childopts += ' '
 profiler_topology_worker_childopts += config['configurations']['metron-profiler-env']['profiler_topology_worker_childopts']
+profiler_max_routes_per_bolt=config['configurations']['metron-profiler-env']['profiler_max_routes_per_bolt']
+profiler_window_lag=config['configurations']['metron-profiler-env']['profiler_window_lag']
+profiler_window_lag_units=config['configurations']['metron-profiler-env']['profiler_window_lag_units']
+profiler_topology_message_timeout_secs=config['configurations']['metron-profiler-env']['profiler_topology_message_timeout_secs']
+profiler_topology_max_spout_pending=config['configurations']['metron-profiler-env']['profiler_topology_max_spout_pending']
 
 # Indexing
 ra_indexing_kafka_start = config['configurations']['metron-indexing-env']['ra_indexing_kafka_start']

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
index 06fd209..fabdaa7 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
@@ -22,6 +22,10 @@
 
 topology.worker.childopts={{profiler_topology_worker_childopts}}
 topology.auto-credentials={{topology_auto_credentials}}
+profiler.workers={{profiler_topology_workers}}
+profiler.executors={{profiler_acker_executors}}
+topology.message.timeout.secs={{profiler_topology_message_timeout_secs}}
+topology.max.spout.pending={{profiler_topology_max_spout_pending}}
 
 ##### Profiler #####
 
@@ -29,10 +33,16 @@ profiler.input.topic={{enrichment_output_topic}}
 profiler.output.topic={{enrichment_input_topic}}
 profiler.period.duration={{profiler_period_duration}}
 profiler.period.duration.units={{profiler_period_units}}
-profiler.workers={{profiler_topology_workers}}
-profiler.executors={{profiler_acker_executors}}
+profiler.window.duration={{profiler_window_duration}}
+profiler.window.duration.units={{profiler_window_units}}
 profiler.ttl={{profiler_ttl}}
 profiler.ttl.units={{profiler_ttl_units}}
+profiler.window.lag={{profiler_window_lag}}
+profiler.window.lag.units={{profiler_window_lag_units}}
+profiler.max.routes.per.bolt={{profiler_max_routes_per_bolt}}
+
+##### HBase #####
+
 profiler.hbase.salt.divisor=1000
 profiler.hbase.table={{profiler_hbase_table}}
 profiler.hbase.column.family={{profiler_hbase_cf}}
@@ -43,6 +53,5 @@ profiler.hbase.flush.interval.seconds={{profiler_hbase_flush_interval}}
 
 kafka.zk={{zookeeper_quorum}}
 kafka.broker={{kafka_brokers}}
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
 kafka.start={{profiler_kafka_start}}
 kafka.security.protocol={{kafka_security_protocol}}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index cef9a3b..234b551 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -221,8 +221,27 @@
               "tab-rows": "3",
               "sections": [
                 {
+                  "name": "section-profiler-setup",
+                  "row-index": "0",
+                  "column-index": "0",
+                  "row-span": "1",
+                  "column-span": "1",
+                  "section-columns": "1",
+                  "section-rows": "1",
+                  "subsections": [
+                    {
+                    "name": "subsection-profiler-setup",
+                    "display-name": "Profiler Setup",
+                    "row-index": "0",
+                    "column-index": "0",
+                    "row-span": "1",
+                    "column-span": "1"
+                  }
+                  ]
+                },
+                {
                 "name": "section-profiler-kafka",
-                "row-index": "0",
+                "row-index": "1",
                 "column-index": "0",
                 "row-span": "1",
                 "column-span": "1",
@@ -240,8 +259,8 @@
                 ]
               },
               {
-                "name": "section-profiler-setup",
-                "row-index": "1",
+                "name": "section-profiler-storm",
+                "row-index": "2",
                 "column-index": "0",
                 "row-span": "1",
                 "column-span": "1",
@@ -249,8 +268,8 @@
                 "section-rows": "1",
                 "subsections": [
                   {
-                  "name": "subsection-profiler-setup",
-                  "display-name": "Profiler Setup",
+                  "name": "subsection-profiler-storm",
+                  "display-name": "Storm",
                   "row-index": "0",
                   "column-index": "0",
                   "row-span": "1",
@@ -259,8 +278,8 @@
                 ]
               },
               {
-                "name": "section-profiler-storm",
-                "row-index": "2",
+                "name": "section-profiler-hbase",
+                "row-index": "3",
                 "column-index": "0",
                 "row-span": "1",
                 "column-span": "1",
@@ -268,8 +287,8 @@
                 "section-rows": "1",
                 "subsections": [
                   {
-                  "name": "subsection-profiler-storm",
-                  "display-name": "Storm",
+                  "name": "subsection-profiler-hbase",
+                  "display-name": "HBase",
                   "row-index": "0",
                   "column-index": "0",
                   "row-span": "1",
@@ -568,7 +587,6 @@
           "config": "metron-indexing-env/bolt_hdfs_rotation_policy_count",
           "subsection-name": "subsection-indexing-hdfs"
         },
-
         {
           "config": "metron-profiler-env/profiler_kafka_start",
           "subsection-name": "subsection-profiler-kafka"
@@ -582,6 +600,14 @@
           "subsection-name": "subsection-profiler-setup"
         },
         {
+          "config": "metron-profiler-env/profiler_window_duration",
+          "subsection-name": "subsection-profiler-setup"
+        },
+        {
+          "config": "metron-profiler-env/profiler_window_units",
+          "subsection-name": "subsection-profiler-setup"
+        },
+        {
           "config": "metron-profiler-env/profiler_ttl",
           "subsection-name": "subsection-profiler-setup"
         },
@@ -590,20 +616,32 @@
           "subsection-name": "subsection-profiler-setup"
         },
         {
-          "config": "metron-profiler-env/profiler_hbase_table",
+          "config": "metron-profiler-env/profiler_window_lag",
           "subsection-name": "subsection-profiler-setup"
         },
         {
-          "config": "metron-profiler-env/profiler_hbase_cf",
+          "config": "metron-profiler-env/profiler_window_lag_units",
           "subsection-name": "subsection-profiler-setup"
         },
         {
-          "config": "metron-profiler-env/profiler_hbase_batch",
+          "config": "metron-profiler-env/profiler_max_routes_per_bolt",
           "subsection-name": "subsection-profiler-setup"
         },
         {
+          "config": "metron-profiler-env/profiler_hbase_table",
+          "subsection-name": "subsection-profiler-hbase"
+        },
+        {
+          "config": "metron-profiler-env/profiler_hbase_cf",
+          "subsection-name": "subsection-profiler-hbase"
+        },
+        {
+          "config": "metron-profiler-env/profiler_hbase_batch",
+          "subsection-name": "subsection-profiler-hbase"
+        },
+        {
           "config": "metron-profiler-env/profiler_hbase_flush_interval",
-          "subsection-name": "subsection-profiler-setup"
+          "subsection-name": "subsection-profiler-hbase"
         },
         {
           "config": "metron-profiler-env/profiler_topology_worker_childopts",
@@ -618,6 +656,14 @@
           "subsection-name": "subsection-profiler-storm"
         },
         {
+          "config": "metron-profiler-env/profiler_topology_message_timeout_secs",
+          "subsection-name": "subsection-profiler-storm"
+        },
+        {
+          "config": "metron-profiler-env/profiler_topology_max_spout_pending",
+          "subsection-name": "subsection-profiler-storm"
+        },
+        {
           "config": "metron-rest-env/metron_rest_port",
           "subsection-name": "subsection-rest"
         },
@@ -905,7 +951,6 @@
           "type": "text-field"
         }
       },
-
       {
         "config": "metron-indexing-env/batch_indexing_acker_executors",
         "widget": {
@@ -1004,6 +1049,18 @@
         }
       },
       {
+        "config": "metron-profiler-env/profiler_window_duration",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-profiler-env/profiler_window_units",
+        "widget": {
+          "type": "combo"
+        }
+      },
+      {
         "config": "metron-profiler-env/profiler_ttl",
         "widget": {
           "type": "text-field"
@@ -1016,6 +1073,24 @@
         }
       },
       {
+        "config": "metron-profiler-env/profiler_max_routes_per_bolt",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-profiler-env/profiler_window_lag",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-profiler-env/profiler_window_lag_units",
+        "widget": {
+          "type": "combo"
+        }
+      },
+      {
         "config": "metron-profiler-env/profiler_hbase_table",
         "widget": {
           "type": "text-field"
@@ -1057,7 +1132,18 @@
           "type": "text-field"
         }
       },
-
+      {
+        "config": "metron-profiler-env/profiler_topology_max_spout_pending",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
+        "config": "metron-profiler-env/profiler_topology_message_timeout_secs",
+        "widget": {
+          "type": "text-field"
+        }
+      },
       {
         "config": "metron-rest-env/metron_rest_port",
         "widget": {

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index 06c82d2..6205fbf 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -89,6 +89,9 @@ public class ProfileConfig implements Serializable {
    */
   private Long expires;
 
+  public ProfileConfig() {
+  }
+
   /**
    * A profile definition requires at the very least the profile name, the foreach, and result
    * expressions.
@@ -114,6 +117,11 @@ public class ProfileConfig implements Serializable {
     this.profile = profile;
   }
 
+  public ProfileConfig withProfile(String profile) {
+    this.profile = profile;
+    return this;
+  }
+
   public String getForeach() {
     return foreach;
   }
@@ -122,6 +130,11 @@ public class ProfileConfig implements Serializable {
     this.foreach = foreach;
   }
 
+  public ProfileConfig withForeach(String foreach) {
+    this.foreach = foreach;
+    return this;
+  }
+
   public String getOnlyif() {
     return onlyif;
   }
@@ -130,6 +143,11 @@ public class ProfileConfig implements Serializable {
     this.onlyif = onlyif;
   }
 
+  public ProfileConfig withOnlyif(String onlyif) {
+    this.onlyif = onlyif;
+    return this;
+  }
+
   public Map<String, String> getInit() {
     return init;
   }
@@ -138,6 +156,16 @@ public class ProfileConfig implements Serializable {
     this.init = init;
   }
 
+  public ProfileConfig withInit(Map<String, String> init) {
+    this.init.putAll(init);
+    return this;
+  }
+
+  public ProfileConfig withInit(String var, String expression) {
+    this.init.put(var, expression);
+    return this;
+  }
+
   public Map<String, String> getUpdate() {
     return update;
   }
@@ -146,6 +174,16 @@ public class ProfileConfig implements Serializable {
     this.update = update;
   }
 
+  public ProfileConfig withUpdate(Map<String, String> update) {
+    this.update.putAll(update);
+    return this;
+  }
+
+  public ProfileConfig withUpdate(String var, String expression) {
+    this.update.put(var, expression);
+    return this;
+  }
+
   public List<String> getGroupBy() {
     return groupBy;
   }
@@ -154,6 +192,11 @@ public class ProfileConfig implements Serializable {
     this.groupBy = groupBy;
   }
 
+  public ProfileConfig withGroupBy(List<String> groupBy) {
+    this.groupBy = groupBy;
+    return this;
+  }
+
   public ProfileResult getResult() {
     return result;
   }
@@ -162,6 +205,11 @@ public class ProfileConfig implements Serializable {
     this.result = result;
   }
 
+  public ProfileConfig withResult(String profileExpression) {
+    this.result = new ProfileResult(profileExpression);
+    return this;
+  }
+
   public Long getExpires() {
     return expires;
   }
@@ -170,6 +218,11 @@ public class ProfileConfig implements Serializable {
     this.expires = expiresDays;
   }
 
+  public ProfileConfig withExpires(Long expiresDays) {
+    this.expires = TimeUnit.DAYS.toMillis(expiresDays);
+    return this;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index e7c081a..0bdb7e2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -20,9 +20,10 @@ package org.apache.metron.common.configuration.profiler;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 /**
- * The definition for entire Profiler, which may contain many Profile definitions.
+ * The configuration object for the Profiler, which may contain many Profile definitions.
  */
 public class ProfilerConfig implements Serializable {
 
@@ -31,6 +32,20 @@ public class ProfilerConfig implements Serializable {
    */
   private List<ProfileConfig> profiles = new ArrayList<>();
 
+  /**
+   * The name of a field containing the timestamp that is used to
+   * generate profiles.
+   *
+   * <p>By default, the processing time of the Profiler is used rather
+   * than event time; a value contained within the message itself.
+   *
+   * <p>The field must contain a timestamp in epoch milliseconds.
+   *
+   * <p>If a message does NOT contain this field, it will be dropped
+   * and not included in any profiles.
+   */
+  private Optional<String> timestampField = Optional.empty();
+
   public List<ProfileConfig> getProfiles() {
     return profiles;
   }
@@ -39,10 +54,33 @@ public class ProfilerConfig implements Serializable {
     this.profiles = profiles;
   }
 
+  public ProfilerConfig withProfile(ProfileConfig profileConfig) {
+    this.profiles.add(profileConfig);
+    return this;
+  }
+
+  public Optional<String> getTimestampField() {
+    return timestampField;
+  }
+
+  public void setTimestampField(String timestampField) {
+    this.timestampField = Optional.of(timestampField);
+  }
+
+  public void setTimestampField(Optional<String> timestampField) {
+    this.timestampField = timestampField;
+  }
+
+  public ProfilerConfig withTimestampField(Optional<String> timestampField) {
+    this.timestampField = timestampField;
+    return this;
+  }
+
   @Override
   public String toString() {
     return "ProfilerConfig{" +
             "profiles=" + profiles +
+            ", timestampField='" + timestampField + '\'' +
             '}';
   }
 
@@ -50,13 +88,15 @@ public class ProfilerConfig implements Serializable {
   public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;
-
     ProfilerConfig that = (ProfilerConfig) o;
-    return profiles != null ? profiles.equals(that.profiles) : that.profiles == null;
+    if (profiles != null ? !profiles.equals(that.profiles) : that.profiles != null) return false;
+    return timestampField != null ? timestampField.equals(that.timestampField) : that.timestampField == null;
   }
 
   @Override
   public int hashCode() {
-    return profiles != null ? profiles.hashCode() : 0;
+    int result = profiles != null ? profiles.hashCode() : 0;
+    result = 31 * result + (timestampField != null ? timestampField.hashCode() : 0);
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
index c02f19d..02e6015 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
@@ -24,6 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.flipkart.zjsonpatch.JsonPatch;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -31,17 +35,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
 
-import com.google.common.reflect.TypeToken;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-
 public enum JSONUtils {
   INSTANCE;
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
index 68c5203..4976d30 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 public class ProfilerUpdater extends ConfigurationsUpdater<ProfilerConfigurations> {
+
   public ProfilerUpdater(Reloadable reloadable, Supplier<ProfilerConfigurations> configSupplier) {
     super(reloadable, configSupplier);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
index a0e115d..e178ee0 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
@@ -27,10 +27,11 @@ import org.junit.Test;
 import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 
 /**
- * Ensures that Profile definitions have the expected defaults
+ * Tests the {@link ProfileConfig} class.
+ *
+ * Ensures that profile definitions have the expected defaults
  * and can be (de)serialized to and from JSON.
  */
 public class ProfileConfigTest {

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
new file mode 100644
index 0000000..2e73cde
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.common.configuration.profiler;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.utils.JSONUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link ProfilerConfig} class.
+ */
+public class ProfilerConfigTest {
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String noTimestampField;
+
+  /**
+   * If no 'timestampField' is defined, it should not be present by default.
+   */
+  @Test
+  public void testNoTimestampField() throws IOException {
+    ProfilerConfig conf = JSONUtils.INSTANCE.load(noTimestampField, ProfilerConfig.class);
+    assertFalse(conf.getTimestampField().isPresent());
+  }
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      }
+   *   ],
+   *   "timestampField": "timestamp"
+   * }
+   */
+  @Multiline
+  private String timestampField;
+
+  /**
+   * If no 'timestampField' is defined, it should not be present by default.
+   */
+  @Test
+  public void testTimestampField() throws IOException {
+    ProfilerConfig conf = JSONUtils.INSTANCE.load(timestampField, ProfilerConfig.class);
+    assertTrue(conf.getTimestampField().isPresent());
+  }
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      },
+   *      {
+   *        "profile": "profile2",
+   *        "foreach": "ip_dst_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String twoProfiles;
+
+  /**
+   * The 'onlyif' field should default to 'true' when it is not specified.
+   */
+  @Test
+  public void testTwoProfiles() throws IOException {
+    ProfilerConfig conf = JSONUtils.INSTANCE.load(twoProfiles, ProfilerConfig.class);
+    assertEquals(2, conf.getProfiles().size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
index 9d8c57e..08910be 100644
--- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java
@@ -30,6 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.logging.Level;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.common.TopicExistsException;
@@ -48,6 +51,7 @@ import kafka.utils.Time;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.metron.integration.InMemoryComponent;
@@ -314,11 +318,44 @@ public class KafkaComponent implements InMemoryComponent {
     }
   }
 
+  /**
+   * Write a collection of messages to a Kafka topic.
+   *
+   * @param topic The name of the Kafka topic.
+   * @param messages The collection of messages to write.
+   */
   public void writeMessages(String topic, Collection<byte[]> messages) {
     try(KafkaProducer<String, byte[]> kafkaProducer = createProducer()) {
       for (byte[] message : messages) {
-        kafkaProducer.send(new ProducerRecord<String, byte[]>(topic, message));
+        kafkaProducer.send(new ProducerRecord<>(topic, message));
       }
     }
   }
+
+  /**
+   * Write messages to a Kafka topic.
+   *
+   * @param topic The name of the Kafka topic.
+   * @param messages The messages to write.
+   */
+  public void writeMessages(String topic, String ...messages) {
+
+    // convert each message to raw bytes
+    List<byte[]> messagesAsBytes = Stream.of(messages)
+            .map(Bytes::toBytes)
+            .collect(Collectors.toList());
+
+    writeMessages(topic, messagesAsBytes);
+  }
+
+  /**
+   * Write messages to a Kafka topic.
+   *
+   * @param topic The name of the Kafka topic.
+   * @param messages The messages to write.
+   */
+  public void writeMessages(String topic, List<String> messages) {
+
+    writeMessages(topic, messages.toArray(new String[] {}));
+  }
 }