You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/05/14 19:28:19 UTC

[01/11] metron git commit: METRON-1511 Unable to Serialize Profiler Configuration (nickwallen) closes apache/metron#982

Repository: metron
Updated Branches:
  refs/heads/feature/METRON-1211-extensions-parsers-gradual a41611b1a -> b9453aabd


METRON-1511 Unable to Serialize Profiler Configuration (nickwallen) closes apache/metron#982


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b5bf9a98
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b5bf9a98
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b5bf9a98

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: b5bf9a98725f866a7fee6470a8e763d17cc69ffd
Parents: a41611b
Author: nickwallen <ni...@nickallen.org>
Authored: Mon Apr 23 09:36:06 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Mon Apr 23 09:36:06 2018 -0400

----------------------------------------------------------------------
 .../configuration/profiler/ProfileConfig.java   |  57 ++++++++--
 .../profiler/ProfileResultExpressions.java      |   4 +-
 .../profiler/ProfileTriageExpressions.java      |   8 ++
 .../configuration/profiler/ProfilerConfig.java  |  81 ++++++++++++--
 .../profiler/ProfileConfigTest.java             | 102 ++++++++++++++---
 .../profiler/ProfilerConfigTest.java            | 109 +++++++++++++++++--
 6 files changed, 310 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/b5bf9a98/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
index f5b46e6..f2272c3 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java
@@ -18,12 +18,15 @@
 package org.apache.metron.common.configuration.profiler;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.metron.common.utils.JSONUtils;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -264,15 +267,47 @@ public class ProfileConfig implements Serializable {
 
   @Override
   public String toString() {
-    return "ProfileConfig{" +
-            "profile='" + profile + '\'' +
-            ", foreach='" + foreach + '\'' +
-            ", onlyif='" + onlyif + '\'' +
-            ", init=" + init +
-            ", update=" + update +
-            ", groupBy=" + groupBy +
-            ", result=" + result +
-            ", expires=" + expires +
-            '}';
+    return new ToStringBuilder(this)
+            .append("profile", profile)
+            .append("foreach", foreach)
+            .append("onlyif", onlyif)
+            .append("init", init)
+            .append("update", update)
+            .append("groupBy", groupBy)
+            .append("result", result)
+            .append("expires", expires)
+            .toString();
+  }
+
+  /**
+   * Deserialize a {@link ProfileConfig}.
+   *
+   * @param bytes Raw bytes containing a UTF-8 JSON String.
+   * @return The Profile definition.
+   * @throws IOException
+   */
+  public static ProfileConfig fromBytes(byte[] bytes) throws IOException {
+    return JSONUtils.INSTANCE.load(new String(bytes), ProfileConfig.class);
+  }
+
+  /**
+   * Deserialize a {@link ProfileConfig}.
+   *
+   * @param json A String containing JSON.
+   * @return The Profile definition.
+   * @throws IOException
+   */
+  public static ProfileConfig fromJSON(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfileConfig.class);
+  }
+
+  /**
+   * Serialize the profile definition to a JSON string.
+   *
+   * @return The Profiler configuration serialized as a JSON string.
+   * @throws JsonProcessingException
+   */
+  public String toJSON() throws JsonProcessingException {
+    return JSONUtils.INSTANCE.toJSON(this, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/b5bf9a98/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
index 82af223..5bcec72 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java
@@ -18,7 +18,7 @@
 package org.apache.metron.common.configuration.profiler;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonValue;
 
 /**
  * A Stellar expression that is executed to produce a single
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
  */
 public class ProfileResultExpressions {
 
-  @JsonIgnore
   private String expression;
 
   @JsonCreator
@@ -34,6 +33,7 @@ public class ProfileResultExpressions {
     this.expression = expression;
   }
 
+  @JsonValue
   public String getExpression() {
     return expression;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/b5bf9a98/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
index fbe1706..da02cb2 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.common.configuration.profiler;
 
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
@@ -61,10 +63,16 @@ public class ProfileTriageExpressions {
     return expressions.get(name);
   }
 
+  @JsonAnyGetter
   public Map<String, String> getExpressions() {
     return expressions;
   }
 
+  @JsonAnySetter
+  public void setExpressions(Map<String, String> expressions) {
+    this.expressions = expressions;
+  }
+
   @Override
   public String toString() {
     return "ProfileTriageExpressions{" +

http://git-wip-us.apache.org/repos/asf/metron/blob/b5bf9a98/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
index 0bdb7e2..e4fa99a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java
@@ -17,6 +17,17 @@
  */
 package org.apache.metron.common.configuration.profiler;
 
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonSetter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.metron.common.utils.JSONUtils;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.map.annotate.JsonSerialize.Inclusion;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -25,6 +36,7 @@ import java.util.Optional;
 /**
  * The configuration object for the Profiler, which may contain many Profile definitions.
  */
+@JsonSerialize(include=Inclusion.NON_NULL)
 public class ProfilerConfig implements Serializable {
 
   /**
@@ -59,10 +71,16 @@ public class ProfilerConfig implements Serializable {
     return this;
   }
 
+  @JsonGetter("timestampField")
+  public String getTimestampFieldForJson() {
+    return timestampField.orElse(null);
+  }
+
   public Optional<String> getTimestampField() {
     return timestampField;
   }
 
+  @JsonSetter("timestampField")
   public void setTimestampField(String timestampField) {
     this.timestampField = Optional.of(timestampField);
   }
@@ -78,25 +96,66 @@ public class ProfilerConfig implements Serializable {
 
   @Override
   public String toString() {
-    return "ProfilerConfig{" +
-            "profiles=" + profiles +
-            ", timestampField='" + timestampField + '\'' +
-            '}';
+    return new ToStringBuilder(this)
+            .append("profiles", profiles)
+            .append("timestampField", timestampField)
+            .toString();
   }
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
     ProfilerConfig that = (ProfilerConfig) o;
-    if (profiles != null ? !profiles.equals(that.profiles) : that.profiles != null) return false;
-    return timestampField != null ? timestampField.equals(that.timestampField) : that.timestampField == null;
+    return new EqualsBuilder()
+            .append(profiles, that.profiles)
+            .append(timestampField, that.timestampField)
+            .isEquals();
   }
 
   @Override
   public int hashCode() {
-    int result = profiles != null ? profiles.hashCode() : 0;
-    result = 31 * result + (timestampField != null ? timestampField.hashCode() : 0);
-    return result;
+    return new HashCodeBuilder(17, 37)
+            .append(profiles)
+            .append(timestampField)
+            .toHashCode();
+  }
+
+  /**
+   * Deserialize a {@link ProfilerConfig}.
+   *
+   * @param bytes Raw bytes containing a UTF-8 JSON String.
+   * @return The Profiler configuration.
+   * @throws IOException
+   */
+  public static ProfilerConfig fromBytes(byte[] bytes) throws IOException {
+    return JSONUtils.INSTANCE.load(new String(bytes), ProfilerConfig.class);
+  }
+
+  /**
+   * Deserialize a {@link ProfilerConfig}.
+   *
+   * @param json A String containing JSON.
+   * @return The Profiler configuration.
+   * @throws IOException
+   */
+  public static ProfilerConfig fromJSON(String json) throws IOException {
+    return JSONUtils.INSTANCE.load(json, ProfilerConfig.class);
+  }
+
+  /**
+   * Serialize a {@link ProfilerConfig} to a JSON string.
+   *
+   * @return The Profiler configuration serialized as a JSON string.
+   * @throws JsonProcessingException
+   */
+  public String toJSON() throws JsonProcessingException {
+    return JSONUtils.INSTANCE.toJSON(this, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/b5bf9a98/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
index e178ee0..87dbbc4 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.metron.common.configuration.profiler;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.utils.JSONUtils;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -51,12 +50,29 @@ public class ProfileConfigTest {
    * The 'onlyif' field should default to 'true' when it is not specified.
    */
   @Test
-  public void testOnlyIfDefault() throws IOException {
-    ProfileConfig profile = JSONUtils.INSTANCE.load(onlyIfDefault, ProfileConfig.class);
+  public void testFromJSONWithOnlyIfDefault() throws IOException {
+    ProfileConfig profile = ProfileConfig.fromJSON(onlyIfDefault);
     assertEquals("true", profile.getOnlyif());
   }
 
   /**
+   * Tests serializing the Profiler configuration to JSON.
+   */
+  @Test
+  public void testToJSONWithOnlyIfDefault() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfileConfig expected = ProfileConfig.fromJSON(onlyIfDefault);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfileConfig actual = ProfileConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
+  }
+
+  /**
    * {
    *    "foreach": "ip_src_addr",
    *    "update": {},
@@ -70,8 +86,8 @@ public class ProfileConfigTest {
    * The 'name' of the profile must be defined.
    */
   @Test(expected = JsonMappingException.class)
-  public void testNameMissing() throws IOException {
-    JSONUtils.INSTANCE.load(nameMissing, ProfileConfig.class);
+  public void testFromJSONWithNameMissing() throws IOException {
+    ProfileConfig.fromJSON(nameMissing);
   }
 
   /**
@@ -88,8 +104,8 @@ public class ProfileConfigTest {
    * The 'foreach' field must be defined.
    */
   @Test(expected = JsonMappingException.class)
-  public void testForeachMissing() throws IOException {
-    JSONUtils.INSTANCE.load(foreachMissing, ProfileConfig.class);
+  public void testFromJSONWithForeachMissing() throws IOException {
+    ProfileConfig.fromJSON(foreachMissing);
   }
 
   /**
@@ -106,8 +122,8 @@ public class ProfileConfigTest {
    * The 'result' field must be defined.
    */
   @Test(expected = JsonMappingException.class)
-  public void testResultMissing() throws IOException {
-    JSONUtils.INSTANCE.load(resultMissing, ProfileConfig.class);
+  public void testFromJSONWithResultMissing() throws IOException {
+    ProfileConfig.fromJSON(resultMissing);
   }
 
   /**
@@ -125,8 +141,8 @@ public class ProfileConfigTest {
    * The 'result' field must contain the 'profile' expression used to store the profile measurement.
    */
   @Test(expected = JsonMappingException.class)
-  public void testResultMissingProfileExpression() throws IOException {
-    JSONUtils.INSTANCE.load(resultMissingProfileExpression, ProfileConfig.class);
+  public void testFromJSONWithResultMissingProfileExpression() throws IOException {
+    ProfileConfig.fromJSON(resultMissingProfileExpression);
   }
 
   /**
@@ -145,8 +161,8 @@ public class ProfileConfigTest {
    * the 'profile' expression used to store the profile measurement.
    */
   @Test
-  public void testResultWithExpression() throws IOException {
-    ProfileConfig profile = JSONUtils.INSTANCE.load(resultWithExpression, ProfileConfig.class);
+  public void testFromJSONWithResultWithExpression() throws IOException {
+    ProfileConfig profile = ProfileConfig.fromJSON(resultWithExpression);
     assertEquals("2 + 2", profile.getResult().getProfileExpressions().getExpression());
 
     // no triage expressions expected
@@ -154,6 +170,23 @@ public class ProfileConfigTest {
   }
 
   /**
+   * Tests serializing the Profiler configuration to JSON.
+   */
+  @Test
+  public void testToJSONWithResultWithExpression() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfileConfig expected = ProfileConfig.fromJSON(resultWithExpression);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfileConfig actual = ProfileConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
+  }
+
+  /**
    * {
    *    "profile": "test",
    *    "foreach": "ip_src_addr",
@@ -170,8 +203,8 @@ public class ProfileConfigTest {
    * The result's 'triage' field is optional.
    */
   @Test
-  public void testResultWithProfileOnly() throws IOException {
-    ProfileConfig profile = JSONUtils.INSTANCE.load(resultWithProfileOnly, ProfileConfig.class);
+  public void testFromJSONWithResultWithProfileOnly() throws IOException {
+    ProfileConfig profile = ProfileConfig.fromJSON(resultWithProfileOnly);
     assertEquals("2 + 2", profile.getResult().getProfileExpressions().getExpression());
 
     // no triage expressions expected
@@ -179,6 +212,23 @@ public class ProfileConfigTest {
   }
 
   /**
+   * Tests serializing the Profiler configuration to JSON.
+   */
+  @Test
+  public void testToJSONWithProfileOnly() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfileConfig expected = ProfileConfig.fromJSON(resultWithProfileOnly);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfileConfig actual = ProfileConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
+  }
+
+  /**
    * {
    *    "profile": "test",
    *    "foreach": "ip_src_addr",
@@ -199,10 +249,28 @@ public class ProfileConfigTest {
    * The result's 'triage' field can contain many named expressions.
    */
   @Test
-  public void testResultWithTriage() throws IOException {
-    ProfileConfig profile = JSONUtils.INSTANCE.load(resultWithTriage, ProfileConfig.class);
+  public void testFromJSONWithResultWithTriage() throws IOException {
+    ProfileConfig profile = ProfileConfig.fromJSON(resultWithTriage);
 
     assertEquals("4 + 4", profile.getResult().getTriageExpressions().getExpression("eight"));
     assertEquals("8 + 8", profile.getResult().getTriageExpressions().getExpression("sixteen"));
   }
+
+  /**
+   * Tests serializing the Profiler configuration to JSON.
+   */
+  @Test
+  public void testToJSONWithResultWithTriage() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfileConfig expected = ProfileConfig.fromJSON(resultWithTriage);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfileConfig actual = ProfileConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/b5bf9a98/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
index 2e73cde..1a11811 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java
@@ -20,7 +20,6 @@
 package org.apache.metron.common.configuration.profiler;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.utils.JSONUtils;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -48,14 +47,41 @@ public class ProfilerConfigTest {
    * }
    */
   @Multiline
+  private String profile;
+
+  /**
+   * Tests deserializing the Profiler configuration using the fromJSON(...) method.
+   */
+  @Test
+  public void testFromJSON() throws IOException {
+    ProfilerConfig conf = ProfilerConfig.fromJSON(profile);
+
+    assertFalse(conf.getTimestampField().isPresent());
+    assertEquals(1, conf.getProfiles().size());
+  }
+
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
   private String noTimestampField;
 
   /**
    * If no 'timestampField' is defined, it should not be present by default.
    */
   @Test
-  public void testNoTimestampField() throws IOException {
-    ProfilerConfig conf = JSONUtils.INSTANCE.load(noTimestampField, ProfilerConfig.class);
+  public void testFromJSONWithNoTimestampField() throws IOException {
+    ProfilerConfig conf = ProfilerConfig.fromJSON(noTimestampField);
     assertFalse(conf.getTimestampField().isPresent());
   }
 
@@ -77,11 +103,12 @@ public class ProfilerConfigTest {
   private String timestampField;
 
   /**
-   * If no 'timestampField' is defined, it should not be present by default.
+   * Tests deserializing the Profiler configuration when the timestamp field is defined.
    */
   @Test
-  public void testTimestampField() throws IOException {
-    ProfilerConfig conf = JSONUtils.INSTANCE.load(timestampField, ProfilerConfig.class);
+  public void testFromJSONWithTimestampField() throws IOException {
+    ProfilerConfig conf = ProfilerConfig.fromJSON(timestampField);
+
     assertTrue(conf.getTimestampField().isPresent());
   }
 
@@ -108,13 +135,75 @@ public class ProfilerConfigTest {
   @Multiline
   private String twoProfiles;
 
+  @Test
+  public void testFromJSONTwoProfiles() throws IOException {
+    ProfilerConfig conf = ProfilerConfig.fromJSON(twoProfiles);
+
+    assertEquals(2, conf.getProfiles().size());
+    assertFalse(conf.getTimestampField().isPresent());
+  }
+
   /**
-   * The 'onlyif' field should default to 'true' when it is not specified.
+   * Tests serializing the Profiler configuration to JSON.
    */
   @Test
-  public void testTwoProfiles() throws IOException {
-    ProfilerConfig conf = JSONUtils.INSTANCE.load(twoProfiles, ProfilerConfig.class);
-    assertEquals(2, conf.getProfiles().size());
+  public void testToJSON() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfilerConfig expected = ProfilerConfig.fromJSON(profile);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfilerConfig actual = ProfilerConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
   }
 
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result": {
+   *          "profile": "count",
+   *          "triage" : { "count": "count" }
+   *        }
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profileWithTriageExpression;
+
+  @Test
+  public void testToJSONWithTriageExpression() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfilerConfig expected = ProfilerConfig.fromJSON(profileWithTriageExpression);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfilerConfig actual = ProfilerConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testToJSONWithTwoProfiles() throws Exception {
+
+    // setup a profiler config to serialize
+    ProfilerConfig expected = ProfilerConfig.fromJSON(twoProfiles);
+
+    // execute the test - serialize the config
+    String asJson = expected.toJSON();
+
+    // validate - deserialize to validate
+    ProfilerConfig actual = ProfilerConfig.fromJSON(asJson);
+    assertEquals(expected, actual);
+  }
 }


[10/11] metron git commit: METRON-1541 Mvn clean results in git status having deleted files. (justinleet via nickwallen) closes apache/metron#1003

Posted by ot...@apache.org.
METRON-1541 Mvn clean results in git status having deleted files. (justinleet via nickwallen) closes apache/metron#1003


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/a17c1adf
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/a17c1adf
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/a17c1adf

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: a17c1adfd4cca63a49ef542c21cef90b18c74be0
Parents: d7edce9
Author: justinleet <ju...@gmail.com>
Authored: Wed May 9 16:47:29 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed May 9 16:47:29 2018 -0400

----------------------------------------------------------------------
 metron-deployment/packaging/ambari/.gitignore   |  2 +
 .../enrichment-splitjoin.properties.j2          | 63 --------------------
 .../templates/enrichment-unified.properties.j2  | 60 -------------------
 3 files changed, 2 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/a17c1adf/metron-deployment/packaging/ambari/.gitignore
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/.gitignore b/metron-deployment/packaging/ambari/.gitignore
index ca2e75c..2f93166 100644
--- a/metron-deployment/packaging/ambari/.gitignore
+++ b/metron-deployment/packaging/ambari/.gitignore
@@ -3,3 +3,5 @@ archive.zip
 elasticsearch.properties.j2
 hdfs.properties.j2
 enrichment.properties.j2
+enrichment-splitjoin.properties.j2
+enrichment-unified.properties.j2

http://git-wip-us.apache.org/repos/asf/metron/blob/a17c1adf/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-splitjoin.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-splitjoin.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-splitjoin.properties.j2
deleted file mode 100644
index a0b21c9..0000000
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-splitjoin.properties.j2
+++ /dev/null
@@ -1,63 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-##### Storm #####
-enrichment.workers={{enrichment_workers}}
-enrichment.acker.executors={{enrichment_acker_executors}}
-topology.worker.childopts={{enrichment_topology_worker_childopts}}
-topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{enrichment_topology_max_spout_pending}}
-
-##### Kafka #####
-kafka.zk={{zookeeper_quorum}}
-kafka.broker={{kafka_brokers}}
-kafka.security.protocol={{kafka_security_protocol}}
-
-# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
-kafka.start={{enrichment_kafka_start}}
-
-enrichment.input.topic={{enrichment_input_topic}}
-enrichment.output.topic={{enrichment_output_topic}}
-enrichment.error.topic={{enrichment_error_topic}}
-threat.intel.error.topic={{threatintel_error_topic}}
-
-##### JoinBolt #####
-enrichment.join.cache.size={{enrichment_join_cache_size}}
-threat.intel.join.cache.size={{threatintel_join_cache_size}}
-
-##### Enrichment #####
-hbase.provider.impl={{enrichment_hbase_provider_impl}}
-enrichment.simple.hbase.table={{enrichment_hbase_table}}
-enrichment.simple.hbase.cf={{enrichment_hbase_cf}}
-enrichment.host.known_hosts={{enrichment_host_known_hosts}}
-
-##### Threat Intel #####
-threat.intel.tracker.table={{threatintel_hbase_table}}
-threat.intel.tracker.cf={{threatintel_hbase_cf}}
-threat.intel.simple.hbase.table={{threatintel_hbase_table}}
-threat.intel.simple.hbase.cf={{threatintel_hbase_cf}}
-
-##### Parallelism #####
-kafka.spout.parallelism={{enrichment_kafka_spout_parallelism}}
-enrichment.split.parallelism={{enrichment_split_parallelism}}
-enrichment.stellar.parallelism={{enrichment_stellar_parallelism}}
-enrichment.join.parallelism={{enrichment_join_parallelism}}
-threat.intel.split.parallelism={{threat_intel_split_parallelism}}
-threat.intel.stellar.parallelism={{threat_intel_stellar_parallelism}}
-threat.intel.join.parallelism={{threat_intel_join_parallelism}}
-kafka.writer.parallelism={{kafka_writer_parallelism}}

http://git-wip-us.apache.org/repos/asf/metron/blob/a17c1adf/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-unified.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-unified.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-unified.properties.j2
deleted file mode 100644
index 8c28c49..0000000
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment-unified.properties.j2
+++ /dev/null
@@ -1,60 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-##### Storm #####
-enrichment.workers={{enrichment_workers}}
-enrichment.acker.executors={{enrichment_acker_executors}}
-topology.worker.childopts={{enrichment_topology_worker_childopts}}
-topology.auto-credentials={{topology_auto_credentials}}
-topology.max.spout.pending={{enrichment_topology_max_spout_pending}}
-
-##### Kafka #####
-kafka.zk={{zookeeper_quorum}}
-kafka.broker={{kafka_brokers}}
-kafka.security.protocol={{kafka_security_protocol}}
-kafka.start={{enrichment_kafka_start}}
-enrichment.input.topic={{enrichment_input_topic}}
-enrichment.output.topic={{enrichment_output_topic}}
-enrichment.error.topic={{enrichment_error_topic}}
-threat.intel.error.topic={{threatintel_error_topic}}
-
-##### Enrichment #####
-hbase.provider.impl={{enrichment_hbase_provider_impl}}
-enrichment.simple.hbase.table={{enrichment_hbase_table}}
-enrichment.simple.hbase.cf={{enrichment_hbase_cf}}
-enrichment.host.known_hosts={{enrichment_host_known_hosts}}
-
-##### Threat Intel #####
-threat.intel.tracker.table={{threatintel_hbase_table}}
-threat.intel.tracker.cf={{threatintel_hbase_cf}}
-threat.intel.simple.hbase.table={{threatintel_hbase_table}}
-threat.intel.simple.hbase.cf={{threatintel_hbase_cf}}
-
-##### Parallelism #####
-kafka.spout.parallelism={{unified_kafka_spout_parallelism}}
-enrichment.parallelism={{unified_enrichment_parallelism}}
-threat.intel.parallelism={{unified_threat_intel_parallelism}}
-kafka.writer.parallelism={{unified_kafka_writer_parallelism}}
-
-##### Caches #####
-enrichment.cache.size={{unified_enrichment_cache_size}}
-threat.intel.cache.size={{unified_threat_intel_cache_size}}
-
-##### Threads #####
-enrichment.threadpool.size={{unified_enrichment_threadpool_size}}
-enrichment.threadpool.type={{unified_enrichment_threadpool_type}}


[09/11] metron git commit: METRON-1461 MIN MAX stellar function should take a stats or list object and return min/max (MohanDV via nickwallen) closes apache/metron#942

Posted by ot...@apache.org.
METRON-1461 MIN MAX stellar function should take a stats or list object and return min/max (MohanDV via nickwallen) closes apache/metron#942


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d7edce97
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d7edce97
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d7edce97

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: d7edce974341a6fca0e8e87b11baed5ad0d5d0c1
Parents: ca4644b
Author: MohanDV <mo...@gmail.com>
Authored: Wed May 9 16:33:32 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed May 9 16:33:32 2018 -0400

----------------------------------------------------------------------
 .../metron/statistics/StatisticsProvider.java   |  6 +--
 .../metron/stellar/dsl/functions/Ordinal.java   | 36 +++++++++++++++
 .../stellar/dsl/functions/OrdinalFunctions.java | 48 +++++++++++++-------
 .../dsl/functions/OrdinalFunctionsTest.java     | 41 ++++++++++++++++-
 4 files changed, 110 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/d7edce97/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/StatisticsProvider.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/StatisticsProvider.java b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/StatisticsProvider.java
index 860aa4e..e737484 100644
--- a/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/StatisticsProvider.java
+++ b/metron-analytics/metron-statistics/src/main/java/org/apache/metron/statistics/StatisticsProvider.java
@@ -20,14 +20,14 @@
 package org.apache.metron.statistics;
 
 
+import org.apache.metron.stellar.dsl.functions.Ordinal;
+
 /**
  * Provides statistical functions.
  */
-public interface StatisticsProvider {
+public interface StatisticsProvider extends Ordinal{
   void addValue(double value);
   long getCount();
-  double getMin();
-  double getMax();
   double getMean();
   double getSum();
   double getVariance();

http://git-wip-us.apache.org/repos/asf/metron/blob/d7edce97/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/Ordinal.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/Ordinal.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/Ordinal.java
new file mode 100644
index 0000000..d3bd9ce
--- /dev/null
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/Ordinal.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.dsl.functions;
+
+/**
+ * Interface that provides the statistical function get max and min from the implementing object.
+ */
+public interface Ordinal {
+
+    /**
+     * get the min value
+     * @return min value
+     */
+    double getMin();
+
+    /**
+     * get the max value
+     * @return max value
+     */
+    double getMax();
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/d7edce97/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctions.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctions.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctions.java
index 49e9369..6ac9ff5 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctions.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctions.java
@@ -22,12 +22,10 @@ import com.google.common.collect.Iterables;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.BaseStellarFunction;
 import org.apache.metron.stellar.dsl.Stellar;
-
-import java.util.Collections;
 import java.util.List;
 import java.util.function.BiFunction;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
+import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
+
 
 public class OrdinalFunctions {
 
@@ -37,19 +35,29 @@ public class OrdinalFunctions {
    * Return the maximum value of a list of input values in a Stellar list
    */
   @Stellar(name = "MAX"
-          , description = "Returns the maximum value of a list of input values"
-          , params = {"list - List of arguments. The list may only contain objects that are mutually comparable / ordinal (implement java.lang.Comparable interface)" +
+          , description = "Returns the maximum value of a list of input values or from a statistics object"
+          , params = {"stats - The Stellar statistics object"
+          ,"list - List of arguments. The list may only contain objects that are mutually comparable / ordinal (implement java.lang.Comparable interface)" +
           " Multi type numeric comparisons are supported: MAX([10,15L,15.3]) would return 15.3, but MAX(['23',25]) will fail and return null as strings and numbers can't be compared."}
-          , returns = "The maximum value in the list, or null if the list is empty or the input values were not comparable.")
+          , returns = "The maximum value in the list or from stats, or null if the list is empty or the input values were not comparable.")
   public static class Max extends BaseStellarFunction {
 
     @Override
     public Object apply(List<Object> args) {
       if (args.size() < 1 || args.get(0) == null) {
-        throw new IllegalStateException("MAX function requires at least a Stellar list of values");
+        throw new IllegalStateException("MAX function requires at least one argument");
+      }
+      Object firstArg = args.get(0);
+      if(firstArg instanceof Ordinal) {
+        Ordinal stats = convert(firstArg, Ordinal.class);
+        return stats.getMax();
+      } else if (firstArg instanceof Iterable) {
+        Iterable<Comparable> list = (Iterable<Comparable>) args.get(0);
+        return orderList(list, (ret, val) -> ret.compareTo(val) < 0, "MAX");
+      } else {
+        throw new IllegalStateException("MAX function expects either 'a StatisticsProvider object' or 'Stellar list of values'");
       }
-      Iterable list = (Iterable<Object>) args.get(0);
-      return orderList(list, (ret, val) -> ret.compareTo(val) < 0, "MAX");
+
     }
   }
 
@@ -60,18 +68,26 @@ public class OrdinalFunctions {
    */
   @Stellar(name = "MIN"
           , description = "Returns the minimum value of a list of input values"
-          , params = {"list - List of arguments. The list may only contain objects that are mutually comparable / ordinal (implement java.lang.Comparable interface)" +
+          , params = {"stats - The Stellar statistics object"
+          ,"list - List of arguments. The list may only contain objects that are mutually comparable / ordinal (implement java.lang.Comparable interface)" +
           " Multi type numeric comparisons are supported: MIN([10,15L,15.3]) would return 10, but MIN(['23',25]) will fail and return null as strings and numbers can't be compared."}
-          , returns = "The minimum value in the list, or null if the list is empty or the input values were not comparable.")
+          , returns = "The minimum value in the list or from stats, or null if the list is empty or the input values were not comparable.")
   public static class Min extends BaseStellarFunction {
     @Override
     public Object apply(List<Object> args) {
       if (args.size() < 1 || args.get(0) == null) {
-        throw new IllegalStateException("MIN function requires at least a Stellar list of values");
+        throw new IllegalStateException("MIN function requires at least one argument");
+      }
+      Object firstArg = args.get(0);
+      if(firstArg instanceof Ordinal) {
+        Ordinal stats = convert(firstArg, Ordinal.class);
+        return stats.getMin();
+      } else if (firstArg instanceof Iterable){
+        Iterable<Comparable> list = (Iterable<Comparable>) args.get(0);
+        return orderList(list, (ret, val) -> ret.compareTo(val) > 0, "MIN");
+      } else {
+        throw new IllegalStateException("MIN function expects either 'a StatisticsProvider object' or 'Stellar list of values' ");
       }
-      Iterable<Comparable> list = (Iterable<Comparable>) args.get(0);
-      return orderList(list, (ret, val) -> ret.compareTo(val) > 0, "MIN");
-
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/d7edce97/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctionsTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctionsTest.java
index 5e06fdd..e405c7f 100644
--- a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctionsTest.java
+++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/dsl/functions/OrdinalFunctionsTest.java
@@ -20,7 +20,6 @@ package org.apache.metron.stellar.dsl.functions;
 
 
 import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.DefaultVariableResolver;
@@ -28,7 +27,6 @@ import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 import java.util.*;
@@ -220,6 +218,45 @@ public class OrdinalFunctionsTest {
     }
   }
 
+
+  @Test
+  public void testMaxOfStats() throws Exception {
+    Ordinal provider = new Ordinal() {
+      @Override
+      public double getMin() {
+        return 10;
+      }
+
+      @Override
+      public double getMax() {
+        return 100;
+      }
+    };
+
+    Object res = run("MAX(input_list)", ImmutableMap.of("input_list", provider));
+    Assert.assertNotNull(res);
+    Assert.assertTrue(res.equals(100.0d));
+  }
+
+  @Test
+  public void testMinOfStats() throws Exception {
+    Ordinal provider = new Ordinal() {
+      @Override
+      public double getMin() {
+        return 10;
+      }
+
+      @Override
+      public double getMax() {
+        return 100;
+      }
+    };
+
+    Object res = run("MIN(input_list)", ImmutableMap.of("input_list", provider));
+    Assert.assertNotNull(res);
+    Assert.assertTrue(res.equals(10.0d));
+  }
+
   public Object run(String rule, Map<String, Object> variables) throws Exception {
     StellarProcessor processor = new StellarProcessor();
     return processor.parse(rule, new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context);


[08/11] metron git commit: METRON-1184 EC2 Deployment - Updating control_path to accommodate for Linux (Ahmed Shah via ottobackwards) closes apache/metron#754

Posted by ot...@apache.org.
METRON-1184 EC2 Deployment - Updating control_path to accommodate for Linux (Ahmed Shah via ottobackwards) closes apache/metron#754


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ca4644ba
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ca4644ba
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ca4644ba

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: ca4644ba5e94c579fed6a7c7d1e8bea05fe9ba9c
Parents: 1b1a45b
Author: Ahmed Shah <a....@ieee.org>
Authored: Wed May 9 14:22:50 2018 -0400
Committer: otto <ot...@apache.org>
Committed: Wed May 9 14:22:50 2018 -0400

----------------------------------------------------------------------
 metron-deployment/amazon-ec2/ansible.cfg | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/ca4644ba/metron-deployment/amazon-ec2/ansible.cfg
----------------------------------------------------------------------
diff --git a/metron-deployment/amazon-ec2/ansible.cfg b/metron-deployment/amazon-ec2/ansible.cfg
index 1f4f0ea..0fe3343 100644
--- a/metron-deployment/amazon-ec2/ansible.cfg
+++ b/metron-deployment/amazon-ec2/ansible.cfg
@@ -24,5 +24,8 @@ forks = 20
 log_path = ./ansible.log
 
 # fix for "ssh throws 'unix domain socket too long' " problem
+#[ssh_connection]
+#control_path = ~/.ssh/ansible-ssh-%%C
+
 [ssh_connection]
-control_path = ~/.ssh/ansible-ssh-%%C
+control_path = ~/.ssh/ansbile-ssh-%%h-%%r


[05/11] metron git commit: METRON-1543 Unable to Set Parser Output Topic in Sensor Config (nickwallen) closes apache/metron#1007

Posted by ot...@apache.org.
METRON-1543 Unable to Set Parser Output Topic in Sensor Config (nickwallen) closes apache/metron#1007


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/3bb926df
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/3bb926df
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/3bb926df

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 3bb926df5d253a907bbf8dab4b76b78dd32993ea
Parents: 2b4f0b8
Author: nickwallen <ni...@nickallen.org>
Authored: Wed May 2 15:06:03 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed May 2 15:06:03 2018 -0400

----------------------------------------------------------------------
 .../org/apache/metron/common/Constants.java     |  10 +-
 .../configuration/SensorParserConfig.java       | 410 +++++++++++--------
 .../parsers/topology/ParserTopologyBuilder.java | 139 ++++---
 .../parsers/topology/ParserTopologyCLI.java     | 147 +++++--
 .../components/ParserTopologyComponent.java     |  80 ++--
 .../parsers/topology/ParserTopologyCLITest.java | 122 ++++--
 ...pleHbaseEnrichmentWriterIntegrationTest.java |  69 ++--
 .../integration/WriterBoltIntegrationTest.java  | 109 ++---
 .../apache/metron/writer/kafka/KafkaWriter.java |   5 +
 9 files changed, 676 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
index b939a92..12b541c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/Constants.java
@@ -17,9 +17,7 @@
  */
 package org.apache.metron.common;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class Constants {
@@ -37,9 +35,17 @@ public class Constants {
   public static final String SIMPLE_HBASE_THREAT_INTEL = "hbaseThreatIntel";
   public static final String GUID = "guid";
 
+  /**
+   * The key in the global configuration that defines the global parser error topic.
+   *
+   * <p>This value is used only if the error topic is left undefined in a sensor's parser configuration.
+   */
+  public static final String PARSER_ERROR_TOPIC_GLOBALS_KEY = "parser.error.topic";
+
   public interface Field {
     String getName();
   }
+
   public enum Fields implements Field {
      SRC_ADDR("ip_src_addr")
     ,SRC_PORT("ip_src_port")

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index d347481..1dfb045 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -18,6 +18,9 @@
 package org.apache.metron.common.configuration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.metron.common.utils.JSONUtils;
 
 import java.io.IOException;
@@ -27,35 +30,171 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * The configuration object that defines a parser for a given sensor.  Each
+ * sensor has its own parser configuration.
+ */
 public class SensorParserConfig implements Serializable {
 
+  /**
+   * The class name of the parser.
+   */
   private String parserClassName;
+
+  /**
+   * Allows logic to be defined to filter or ignore messages.  Messages that have been
+   * filtered will not be parsed.
+   *
+   * This should be a fully qualified name of a class that implements the
+   * org.apache.metron.parsers.interfaces.MessageFilter interface.
+   */
   private String filterClassName;
+
+  /**
+   * The input topic containing the sensor telemetry to parse.
+   */
   private String sensorTopic;
+
+  /**
+   * The output topic where the parsed telemetry will be written.
+   */
+  private String outputTopic;
+
+  /**
+   * The error topic where errors are written to.
+   */
+  private String errorTopic;
+
+  /**
+   * The fully qualified name of a class used to write messages
+   * to the output topic.
+   *
+   * <p>A sensible default is provided.
+   */
   private String writerClassName;
+
+  /**
+   * The fully qualified name of a class used to write messages
+   * to the error topic.
+   *
+   * <p>A sensible default is provided.
+   */
   private String errorWriterClassName;
-  private String invalidWriterClassName;
+
+  /**
+   * Determines if parser metadata is made available to the parser's field
+   * transformations. If true, the parser field transformations can access
+   * parser metadata values.
+   *
+   * <p>By default, this is false and parser metadata is not available
+   * to the field transformations.
+   */
   private Boolean readMetadata = false;
+
+  /**
+   * Determines if parser metadata is automatically merged into the message.  If
+   * true, parser metadata values will appear as fields within the message.
+   *
+   * <p>By default, this is false and metadata is not merged.
+   */
   private Boolean mergeMetadata = false;
+
+  /**
+   * The number of workers for the topology.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer numWorkers = null;
+
+  /**
+   * The number of ackers for the topology.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer numAckers= null;
+
+  /**
+   * The parallelism of the Kafka spout.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer spoutParallelism = 1;
+
+  /**
+   * The number of tasks for the Kafka spout.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer spoutNumTasks = 1;
+
+  /**
+   * The parallelism of the parser bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer parserParallelism = 1;
+
+  /**
+   * The number of tasks for the parser bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer parserNumTasks = 1;
+
+  /**
+   * The parallelism of the error writer bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer errorWriterParallelism = 1;
+
+  /**
+   * The number of tasks for the error writer bolt.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Integer errorWriterNumTasks = 1;
-  private Map<String, Object> cacheConfig = new HashMap<>();
+
+  /**
+   * Configuration properties passed to the Kafka spout.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Map<String, Object> spoutConfig = new HashMap<>();
+
+  /**
+   * The Kafka security protocol.
+   *
+   * <p>This property can be overridden on the CLI.  This property can also be overridden by the spout config.
+   */
   private String securityProtocol = null;
+
+  /**
+   * Configuration properties passed to the storm topology.
+   *
+   * <p>This property can be overridden on the CLI.
+   */
   private Map<String, Object> stormConfig = new HashMap<>();
 
   /**
-   * Cache config for stellar field transformations.
-   * * stellar.cache.maxSize - The maximum number of elements in the cache.
-   * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
-   * @return
+   * Configuration for the parser.
+   */
+  private Map<String, Object> parserConfig = new HashMap<>();
+
+  /**
+   * The field transformations applied to the parsed messages. These allow fields
+   * of the parsed message to be transformed.
    */
+  private List<FieldTransformer> fieldTransformations = new ArrayList<>();
+
+  /**
+   * Configures the cache that backs stellar field transformations.
+   *
+   * <li>stellar.cache.maxSize - The maximum number of elements in the cache.
+   * <li>stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
+   */
+  private Map<String, Object> cacheConfig = new HashMap<>();
+
   public Map<String, Object> getCacheConfig() {
     return cacheConfig;
   }
@@ -64,10 +203,6 @@ public class SensorParserConfig implements Serializable {
     this.cacheConfig = cacheConfig;
   }
 
-  /**
-   * Return the number of workers for the topology.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getNumWorkers() {
     return numWorkers;
   }
@@ -76,10 +211,6 @@ public class SensorParserConfig implements Serializable {
     this.numWorkers = numWorkers;
   }
 
-  /**
-   * Return the number of ackers for the topology.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getNumAckers() {
     return numAckers;
   }
@@ -88,10 +219,6 @@ public class SensorParserConfig implements Serializable {
     this.numAckers = numAckers;
   }
 
-  /**
-   * Return the spout parallelism.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getSpoutParallelism() {
     return spoutParallelism;
   }
@@ -100,10 +227,6 @@ public class SensorParserConfig implements Serializable {
     this.spoutParallelism = spoutParallelism;
   }
 
-  /**
-   * Return the spout num tasks.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getSpoutNumTasks() {
     return spoutNumTasks;
   }
@@ -112,10 +235,6 @@ public class SensorParserConfig implements Serializable {
     this.spoutNumTasks = spoutNumTasks;
   }
 
-  /**
-   * Return the parser parallelism.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getParserParallelism() {
     return parserParallelism;
   }
@@ -124,10 +243,6 @@ public class SensorParserConfig implements Serializable {
     this.parserParallelism = parserParallelism;
   }
 
-  /**
-   * Return the parser number of tasks.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getParserNumTasks() {
     return parserNumTasks;
   }
@@ -136,10 +251,6 @@ public class SensorParserConfig implements Serializable {
     this.parserNumTasks = parserNumTasks;
   }
 
-  /**
-   * Return the error writer bolt parallelism.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getErrorWriterParallelism() {
     return errorWriterParallelism;
   }
@@ -148,10 +259,6 @@ public class SensorParserConfig implements Serializable {
     this.errorWriterParallelism = errorWriterParallelism;
   }
 
-  /**
-   * Return the error writer bolt number of tasks.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Integer getErrorWriterNumTasks() {
     return errorWriterNumTasks;
   }
@@ -160,10 +267,6 @@ public class SensorParserConfig implements Serializable {
     this.errorWriterNumTasks = errorWriterNumTasks;
   }
 
-  /**
-   * Return the spout config.  This includes kafka properties.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Map<String, Object> getSpoutConfig() {
     return spoutConfig;
   }
@@ -172,11 +275,6 @@ public class SensorParserConfig implements Serializable {
     this.spoutConfig = spoutConfig;
   }
 
-  /**
-   * Return security protocol to use.  This property will be used for the parser unless overridden on the CLI.
-   * The order of precedence is CLI > spout config > config in the sensor parser config.
-   * @return
-   */
   public String getSecurityProtocol() {
     return securityProtocol;
   }
@@ -185,10 +283,6 @@ public class SensorParserConfig implements Serializable {
     this.securityProtocol = securityProtocol;
   }
 
-  /**
-   * Return Storm topologyconfig.  This property will be used for the parser unless overridden on the CLI.
-   * @return
-   */
   public Map<String, Object> getStormConfig() {
     return stormConfig;
   }
@@ -197,10 +291,6 @@ public class SensorParserConfig implements Serializable {
     this.stormConfig = stormConfig;
   }
 
-  /**
-   * Return whether or not to merge metadata sent into the message.  If true, then metadata become proper fields.
-   * @return
-   */
   public Boolean getMergeMetadata() {
     return mergeMetadata;
   }
@@ -209,10 +299,6 @@ public class SensorParserConfig implements Serializable {
     this.mergeMetadata = mergeMetadata;
   }
 
-  /**
-   * Return whether or not to read metadata at all.
-   * @return
-   */
   public Boolean getReadMetadata() {
     return readMetadata;
   }
@@ -229,22 +315,13 @@ public class SensorParserConfig implements Serializable {
     this.errorWriterClassName = errorWriterClassName;
   }
 
-  public String getInvalidWriterClassName() {
-    return invalidWriterClassName;
-  }
-
-  public void setInvalidWriterClassName(String invalidWriterClassName) {
-    this.invalidWriterClassName = invalidWriterClassName;
-  }
-
   public String getWriterClassName() {
     return writerClassName;
   }
+
   public void setWriterClassName(String classNames) {
     this.writerClassName = classNames;
   }
-  private Map<String, Object> parserConfig = new HashMap<>();
-  private List<FieldTransformer> fieldTransformations = new ArrayList<>();
 
   public List<FieldTransformer> getFieldTransformations() {
     return fieldTransformations;
@@ -278,6 +355,22 @@ public class SensorParserConfig implements Serializable {
     this.sensorTopic = sensorTopic;
   }
 
+  public String getOutputTopic() {
+    return outputTopic;
+  }
+
+  public void setOutputTopic(String outputTopic) {
+    this.outputTopic = outputTopic;
+  }
+
+  public String getErrorTopic() {
+    return errorTopic;
+  }
+
+  public void setErrorTopic(String errorTopic) {
+    this.errorTopic = errorTopic;
+  }
+
   public Map<String, Object> getParserConfig() {
     return parserConfig;
   }
@@ -298,112 +391,103 @@ public class SensorParserConfig implements Serializable {
     }
   }
 
-
   public String toJSON() throws JsonProcessingException {
     return JSONUtils.INSTANCE.toJSON(this, true);
   }
 
   @Override
-  public String toString() {
-    return "SensorParserConfig{" +
-            "parserClassName='" + parserClassName + '\'' +
-            ", filterClassName='" + filterClassName + '\'' +
-            ", sensorTopic='" + sensorTopic + '\'' +
-            ", writerClassName='" + writerClassName + '\'' +
-            ", errorWriterClassName='" + errorWriterClassName + '\'' +
-            ", invalidWriterClassName='" + invalidWriterClassName + '\'' +
-            ", readMetadata=" + readMetadata +
-            ", mergeMetadata=" + mergeMetadata +
-            ", numWorkers=" + numWorkers +
-            ", numAckers=" + numAckers +
-            ", spoutParallelism=" + spoutParallelism +
-            ", spoutNumTasks=" + spoutNumTasks +
-            ", parserParallelism=" + parserParallelism +
-            ", parserNumTasks=" + parserNumTasks +
-            ", errorWriterParallelism=" + errorWriterParallelism +
-            ", errorWriterNumTasks=" + errorWriterNumTasks +
-            ", spoutConfig=" + spoutConfig +
-            ", securityProtocol='" + securityProtocol + '\'' +
-            ", stormConfig=" + stormConfig +
-            ", parserConfig=" + parserConfig +
-            ", fieldTransformations=" + fieldTransformations +
-            '}';
-  }
-
-  @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    SensorParserConfig that = (SensorParserConfig) o;
+    if (this == o) {
+      return true;
+    }
 
-    if (getParserClassName() != null ? !getParserClassName().equals(that.getParserClassName()) : that.getParserClassName() != null)
-      return false;
-    if (getFilterClassName() != null ? !getFilterClassName().equals(that.getFilterClassName()) : that.getFilterClassName() != null)
-      return false;
-    if (getSensorTopic() != null ? !getSensorTopic().equals(that.getSensorTopic()) : that.getSensorTopic() != null)
-      return false;
-    if (getWriterClassName() != null ? !getWriterClassName().equals(that.getWriterClassName()) : that.getWriterClassName() != null)
-      return false;
-    if (getErrorWriterClassName() != null ? !getErrorWriterClassName().equals(that.getErrorWriterClassName()) : that.getErrorWriterClassName() != null)
-      return false;
-    if (getInvalidWriterClassName() != null ? !getInvalidWriterClassName().equals(that.getInvalidWriterClassName()) : that.getInvalidWriterClassName() != null)
-      return false;
-    if (getReadMetadata() != null ? !getReadMetadata().equals(that.getReadMetadata()) : that.getReadMetadata() != null)
-      return false;
-    if (getMergeMetadata() != null ? !getMergeMetadata().equals(that.getMergeMetadata()) : that.getMergeMetadata() != null)
+    if (o == null || getClass() != o.getClass()) {
       return false;
-    if (getNumWorkers() != null ? !getNumWorkers().equals(that.getNumWorkers()) : that.getNumWorkers() != null)
-      return false;
-    if (getNumAckers() != null ? !getNumAckers().equals(that.getNumAckers()) : that.getNumAckers() != null)
-      return false;
-    if (getSpoutParallelism() != null ? !getSpoutParallelism().equals(that.getSpoutParallelism()) : that.getSpoutParallelism() != null)
-      return false;
-    if (getSpoutNumTasks() != null ? !getSpoutNumTasks().equals(that.getSpoutNumTasks()) : that.getSpoutNumTasks() != null)
-      return false;
-    if (getParserParallelism() != null ? !getParserParallelism().equals(that.getParserParallelism()) : that.getParserParallelism() != null)
-      return false;
-    if (getParserNumTasks() != null ? !getParserNumTasks().equals(that.getParserNumTasks()) : that.getParserNumTasks() != null)
-      return false;
-    if (getErrorWriterParallelism() != null ? !getErrorWriterParallelism().equals(that.getErrorWriterParallelism()) : that.getErrorWriterParallelism() != null)
-      return false;
-    if (getErrorWriterNumTasks() != null ? !getErrorWriterNumTasks().equals(that.getErrorWriterNumTasks()) : that.getErrorWriterNumTasks() != null)
-      return false;
-    if (getSpoutConfig() != null ? !getSpoutConfig().equals(that.getSpoutConfig()) : that.getSpoutConfig() != null)
-      return false;
-    if (getSecurityProtocol() != null ? !getSecurityProtocol().equals(that.getSecurityProtocol()) : that.getSecurityProtocol() != null)
-      return false;
-    if (getStormConfig() != null ? !getStormConfig().equals(that.getStormConfig()) : that.getStormConfig() != null)
-      return false;
-    if (getParserConfig() != null ? !getParserConfig().equals(that.getParserConfig()) : that.getParserConfig() != null)
-      return false;
-    return getFieldTransformations() != null ? getFieldTransformations().equals(that.getFieldTransformations()) : that.getFieldTransformations() == null;
+    }
 
+    SensorParserConfig that = (SensorParserConfig) o;
+    return new EqualsBuilder()
+            .append(parserClassName, that.parserClassName)
+            .append(filterClassName, that.filterClassName)
+            .append(sensorTopic, that.sensorTopic)
+            .append(outputTopic, that.outputTopic)
+            .append(errorTopic, that.errorTopic)
+            .append(writerClassName, that.writerClassName)
+            .append(errorWriterClassName, that.errorWriterClassName)
+            .append(readMetadata, that.readMetadata)
+            .append(mergeMetadata, that.mergeMetadata)
+            .append(numWorkers, that.numWorkers)
+            .append(numAckers, that.numAckers)
+            .append(spoutParallelism, that.spoutParallelism)
+            .append(spoutNumTasks, that.spoutNumTasks)
+            .append(parserParallelism, that.parserParallelism)
+            .append(parserNumTasks, that.parserNumTasks)
+            .append(errorWriterParallelism, that.errorWriterParallelism)
+            .append(errorWriterNumTasks, that.errorWriterNumTasks)
+            .append(spoutConfig, that.spoutConfig)
+            .append(securityProtocol, that.securityProtocol)
+            .append(stormConfig, that.stormConfig)
+            .append(cacheConfig, that.cacheConfig)
+            .append(parserConfig, that.parserConfig)
+            .append(fieldTransformations, that.fieldTransformations)
+            .isEquals();
   }
 
   @Override
   public int hashCode() {
-    int result = getParserClassName() != null ? getParserClassName().hashCode() : 0;
-    result = 31 * result + (getFilterClassName() != null ? getFilterClassName().hashCode() : 0);
-    result = 31 * result + (getSensorTopic() != null ? getSensorTopic().hashCode() : 0);
-    result = 31 * result + (getWriterClassName() != null ? getWriterClassName().hashCode() : 0);
-    result = 31 * result + (getErrorWriterClassName() != null ? getErrorWriterClassName().hashCode() : 0);
-    result = 31 * result + (getInvalidWriterClassName() != null ? getInvalidWriterClassName().hashCode() : 0);
-    result = 31 * result + (getReadMetadata() != null ? getReadMetadata().hashCode() : 0);
-    result = 31 * result + (getMergeMetadata() != null ? getMergeMetadata().hashCode() : 0);
-    result = 31 * result + (getNumWorkers() != null ? getNumWorkers().hashCode() : 0);
-    result = 31 * result + (getNumAckers() != null ? getNumAckers().hashCode() : 0);
-    result = 31 * result + (getSpoutParallelism() != null ? getSpoutParallelism().hashCode() : 0);
-    result = 31 * result + (getSpoutNumTasks() != null ? getSpoutNumTasks().hashCode() : 0);
-    result = 31 * result + (getParserParallelism() != null ? getParserParallelism().hashCode() : 0);
-    result = 31 * result + (getParserNumTasks() != null ? getParserNumTasks().hashCode() : 0);
-    result = 31 * result + (getErrorWriterParallelism() != null ? getErrorWriterParallelism().hashCode() : 0);
-    result = 31 * result + (getErrorWriterNumTasks() != null ? getErrorWriterNumTasks().hashCode() : 0);
-    result = 31 * result + (getSpoutConfig() != null ? getSpoutConfig().hashCode() : 0);
-    result = 31 * result + (getSecurityProtocol() != null ? getSecurityProtocol().hashCode() : 0);
-    result = 31 * result + (getStormConfig() != null ? getStormConfig().hashCode() : 0);
-    result = 31 * result + (getParserConfig() != null ? getParserConfig().hashCode() : 0);
-    result = 31 * result + (getFieldTransformations() != null ? getFieldTransformations().hashCode() : 0);
-    return result;
+    return new HashCodeBuilder(17, 37)
+            .append(parserClassName)
+            .append(filterClassName)
+            .append(sensorTopic)
+            .append(outputTopic)
+            .append(errorTopic)
+            .append(writerClassName)
+            .append(errorWriterClassName)
+            .append(readMetadata)
+            .append(mergeMetadata)
+            .append(numWorkers)
+            .append(numAckers)
+            .append(spoutParallelism)
+            .append(spoutNumTasks)
+            .append(parserParallelism)
+            .append(parserNumTasks)
+            .append(errorWriterParallelism)
+            .append(errorWriterNumTasks)
+            .append(spoutConfig)
+            .append(securityProtocol)
+            .append(stormConfig)
+            .append(cacheConfig)
+            .append(parserConfig)
+            .append(fieldTransformations)
+            .toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+            .append("parserClassName", parserClassName)
+            .append("filterClassName", filterClassName)
+            .append("sensorTopic", sensorTopic)
+            .append("outputTopic", outputTopic)
+            .append("errorTopic", errorTopic)
+            .append("writerClassName", writerClassName)
+            .append("errorWriterClassName", errorWriterClassName)
+            .append("readMetadata", readMetadata)
+            .append("mergeMetadata", mergeMetadata)
+            .append("numWorkers", numWorkers)
+            .append("numAckers", numAckers)
+            .append("spoutParallelism", spoutParallelism)
+            .append("spoutNumTasks", spoutNumTasks)
+            .append("parserParallelism", parserParallelism)
+            .append("parserNumTasks", parserNumTasks)
+            .append("errorWriterParallelism", errorWriterParallelism)
+            .append("errorWriterNumTasks", errorWriterNumTasks)
+            .append("spoutConfig", spoutConfig)
+            .append("securityProtocol", securityProtocol)
+            .append("stormConfig", stormConfig)
+            .append("cacheConfig", cacheConfig)
+            .append("parserConfig", parserConfig)
+            .append("fieldTransformations", fieldTransformations)
+            .toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index 2865dd6..cd4ad50 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -84,7 +84,7 @@ public class ParserTopologyBuilder {
    * @param errorWriterNumTasksSupplier      Supplier for the number of tasks for the bolt that handles errors
    * @param kafkaSpoutConfigSupplier         Supplier for the configuration options for the kafka spout
    * @param securityProtocolSupplier         Supplier for the security protocol
-   * @param outputTopic                      The output kafka topic
+   * @param outputTopicSupplier              Supplier for the output kafka topic
    * @param stormConfigSupplier              Supplier for the storm config
    * @return A Storm topology that parses telemetry data received from an external sensor
    * @throws Exception
@@ -100,7 +100,8 @@ public class ParserTopologyBuilder {
                                       ValueSupplier<Integer> errorWriterNumTasksSupplier,
                                       ValueSupplier<Map> kafkaSpoutConfigSupplier,
                                       ValueSupplier<String> securityProtocolSupplier,
-                                      Optional<String> outputTopic,
+                                      ValueSupplier<String> outputTopicSupplier,
+                                      ValueSupplier<String> errorTopicSupplier,
                                       ValueSupplier<Config> stormConfigSupplier
   ) throws Exception {
 
@@ -113,24 +114,27 @@ public class ParserTopologyBuilder {
     int parserNumTasks = parserNumTasksSupplier.get(parserConfig, Integer.class);
     int errorWriterParallelism = errorWriterParallelismSupplier.get(parserConfig, Integer.class);
     int errorWriterNumTasks = errorWriterNumTasksSupplier.get(parserConfig, Integer.class);
+    String outputTopic = outputTopicSupplier.get(parserConfig, String.class);
+
     Map<String, Object> kafkaSpoutConfig = kafkaSpoutConfigSupplier.get(parserConfig, Map.class);
     Optional<String> securityProtocol = Optional.ofNullable(securityProtocolSupplier.get(parserConfig, String.class));
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
+    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol, Optional.ofNullable(kafkaSpoutConfig), parserConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
             .setNumTasks(spoutNumTasks);
 
     // create the parser bolt
-    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, outputTopic);
+    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, Optional.of(outputTopic));
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
             .setNumTasks(parserNumTasks)
             .localOrShuffleGrouping("kafkaSpout");
 
     // create the error bolt, if needed
     if (errorWriterNumTasks > 0) {
-      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig);
+      String errorTopic = errorTopicSupplier.get(parserConfig, String.class);
+      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol, configs, parserConfig, errorTopic);
       builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
               .setNumTasks(errorWriterNumTasks)
               .localOrShuffleGrouping("parserBolt", Constants.ERROR_STREAM);
@@ -176,24 +180,35 @@ public class ParserTopologyBuilder {
                                          );
   }
 
-  private static KafkaWriter createKafkaWriter( Optional<String> broker
-                                              , String zkQuorum
-                                              , Optional<String> securityProtocol
-                                              )
-  {
-    KafkaWriter ret = null;
+  /**
+   * Create a Kafka writer.
+   *
+   * @param broker An optional URL to the Kafka brokers.
+   * @param zkQuorum The URL to Zookeeper.
+   * @param securityProtocol An optional security protocol in use.
+   * @return
+   */
+  private static KafkaWriter createKafkaWriter(Optional<String> broker,
+                                               String zkQuorum,
+                                               Optional<String> securityProtocol) {
+    KafkaWriter writer = new KafkaWriter();
+
+    // cluster URL; either broker or zookeeper
     if(broker.isPresent()) {
-      ret = new KafkaWriter(broker.get());
-    }
-    else {
-      ret = new KafkaWriter().withZkQuorum(zkQuorum);
+      writer.withBrokerUrl(broker.get());
+
+    } else {
+      writer.withZkQuorum(zkQuorum);
     }
+
+    // security protocol
     if(securityProtocol.isPresent()) {
       HashMap<String, Object> config = new HashMap<>();
       config.put("security.protocol", securityProtocol.get());
-      ret.withProducerConfigs(config);
+      writer.withProducerConfigs(config);
     }
-    return ret;
+
+    return writer;
   }
 
   /**
@@ -206,27 +221,31 @@ public class ParserTopologyBuilder {
    * @param parserConfig
    * @return A Storm bolt that parses input from a sensor
    */
-  private static ParserBolt createParserBolt( String zookeeperUrl
-                                            , Optional<String> brokerUrl
-                                            , String sensorType
-                                            , Optional<String> securityProtocol
-                                            , ParserConfigurations configs
-                                            , SensorParserConfig parserConfig
-                                            , Optional<String> outputTopic
-                                            )
-  {
+  private static ParserBolt createParserBolt( String zookeeperUrl,
+                                              Optional<String> brokerUrl,
+                                              String sensorType,
+                                              Optional<String> securityProtocol,
+                                              ParserConfigurations configs,
+                                              SensorParserConfig parserConfig,
+                                              Optional<String> outputTopic) {
 
     // create message parser
     MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
     parser.configure(parserConfig.getParserConfig());
 
-    // create writer - if not configured uses a sensible default
-    AbstractWriter writer = parserConfig.getWriterClassName() == null ?
-            createKafkaWriter( brokerUrl
-                             , zookeeperUrl
-                             , securityProtocol
-                             ).withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC)) :
-            ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    // create a writer
+    AbstractWriter writer;
+    if(parserConfig.getWriterClassName() == null) {
+
+      // if not configured, use a sensible default
+      writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+              .withTopic(outputTopic.orElse(Constants.ENRICHMENT_TOPIC));
+
+    } else {
+      writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    }
+
+    // configure it
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
     // create a writer handler
@@ -238,37 +257,47 @@ public class ParserTopologyBuilder {
   /**
    * Create a bolt that handles error messages.
    *
-   * @param zookeeperUrl    Kafka zookeeper URL
-   * @param brokerUrl    Kafka Broker URL
-   * @param sensorType   Type of sensor that is being consumed.
-   * @param securityProtocol   Security protocol used (if any)
+   * @param zookeeperUrl Kafka zookeeper URL
+   * @param brokerUrl Kafka Broker URL
+   * @param sensorType Type of sensor that is being consumed.
+   * @param securityProtocol Security protocol used (if any)
    * @param configs
-   * @param parserConfig
+   * @param parserConfig The sensor's parser configuration.
    * @return A Storm bolt that handles error messages.
    */
-  private static WriterBolt createErrorBolt( String zookeeperUrl
-                                           , Optional<String> brokerUrl
-                                           , String sensorType
-                                           , Optional<String> securityProtocol
-                                           , ParserConfigurations configs
-                                           , SensorParserConfig parserConfig
-                                           )
-  {
+  private static WriterBolt createErrorBolt( String zookeeperUrl,
+                                             Optional<String> brokerUrl,
+                                             String sensorType,
+                                             Optional<String> securityProtocol,
+                                             ParserConfigurations configs,
+                                             SensorParserConfig parserConfig,
+                                             String errorTopic) {
+
+    // create a writer
+    AbstractWriter writer;
+    if (parserConfig.getErrorWriterClassName() == null) {
+
+      if(errorTopic == null) {
+        errorTopic = (String) configs.getGlobalConfig().get(Constants.PARSER_ERROR_TOPIC_GLOBALS_KEY);
+      }
+
+      // if not configured, uses a sensible default
+      writer = createKafkaWriter(brokerUrl, zookeeperUrl, securityProtocol)
+              .withTopic(errorTopic)
+              .withConfigPrefix("error");
+
+    } else {
+      writer = ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    }
 
-    // create writer - if not configured uses a sensible default
-    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ?
-              createKafkaWriter( brokerUrl
-                               , zookeeperUrl
-                               , securityProtocol
-                               ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic"))
-                                .withConfigPrefix("error")
-            : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
+    // configure it
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
     // create a writer handler
     WriterHandler writerHandler = createWriterHandler(writer);
 
-    return new WriterBolt(writerHandler, configs, sensorType).withErrorType(Constants.ErrorType.PARSER_ERROR);
+    return new WriterBolt(writerHandler, configs, sensorType)
+            .withErrorType(Constants.ErrorType.PARSER_ERROR);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 3824212..f60ff44 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -310,34 +310,40 @@ public class ParserTopologyCLI {
     String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
 
     /*
-    It bears mentioning why we're creating this ValueSupplier indirection here.
-    As a separation of responsibilities, the CLI class defines the order of precedence
-    for the various topological and structural properties for creating a parser.  This is
-    desirable because there are now (i.e. integration tests)
-    and may be in the future (i.e. a REST service to start parsers without using the CLI)
-    other mechanisms to construct parser topologies.  It's sensible to split those concerns..
-
-    Unfortunately, determining the structural parameters for a parser requires interacting with
-    external services (e.g. zookeeper) that are set up well within the ParserTopology class.
-    Rather than pulling the infrastructure to interact with those services out and moving it into the
-    CLI class and breaking that separation of concerns, we've created a supplier
-    indirection where are providing the logic as to how to create precedence in the CLI class
-    without owning the responsibility of constructing the infrastructure where the values are
-    necessarily supplied.
-
+     * It bears mentioning why we're creating this ValueSupplier indirection here.
+     * As a separation of responsibilities, the CLI class defines the order of precedence
+     * for the various topological and structural properties for creating a parser.  This is
+     * desirable because there are now (i.e. integration tests)
+     * and may be in the future (i.e. a REST service to start parsers without using the CLI)
+     * other mechanisms to construct parser topologies.  It's sensible to split those concerns..
+     *
+     * Unfortunately, determining the structural parameters for a parser requires interacting with
+     * external services (e.g. zookeeper) that are set up well within the ParserTopology class.
+     * Rather than pulling the infrastructure to interact with those services out and moving it into the
+     * CLI class and breaking that separation of concerns, we've created a supplier
+     * indirection where are providing the logic as to how to create precedence in the CLI class
+     * without owning the responsibility of constructing the infrastructure where the values are
+     * necessarily supplied.
+     *
      */
+
+    // kafka spout parallelism
     ValueSupplier<Integer> spoutParallelism = (parserConfig, clazz) -> {
       if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
       }
       return Optional.ofNullable(parserConfig.getSpoutParallelism()).orElse(1);
     };
+
+    // kafka spout number of tasks
     ValueSupplier<Integer> spoutNumTasks = (parserConfig, clazz) -> {
       if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
       }
       return Optional.ofNullable(parserConfig.getSpoutNumTasks()).orElse(1);
     };
+
+    // parser bolt parallelism
     ValueSupplier<Integer> parserParallelism = (parserConfig, clazz) -> {
       if(ParserOptions.PARSER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1"));
@@ -345,6 +351,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getParserParallelism()).orElse(1);
     };
 
+    // parser bolt number of tasks
     ValueSupplier<Integer> parserNumTasks = (parserConfig, clazz) -> {
       if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1"));
@@ -352,6 +359,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getParserNumTasks()).orElse(1);
     };
 
+    // error bolt parallelism
     ValueSupplier<Integer> errorParallelism = (parserConfig, clazz) -> {
       if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1"));
@@ -359,6 +367,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getErrorWriterParallelism()).orElse(1);
     };
 
+    // error bolt number of tasks
     ValueSupplier<Integer> errorNumTasks = (parserConfig, clazz) -> {
       if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) {
         return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1"));
@@ -366,6 +375,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getErrorWriterNumTasks()).orElse(1);
     };
 
+    // kafka spout config
     ValueSupplier<Map> spoutConfig = (parserConfig, clazz) -> {
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
         return readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
@@ -373,6 +383,7 @@ public class ParserTopologyCLI {
       return Optional.ofNullable(parserConfig.getSpoutConfig()).orElse(new HashMap<>());
     };
 
+    // security protocol
     ValueSupplier<String> securityProtocol = (parserConfig, clazz) -> {
       Optional<String> sp = Optional.empty();
       if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) {
@@ -384,6 +395,7 @@ public class ParserTopologyCLI {
       return sp.orElse(Optional.ofNullable(parserConfig.getSecurityProtocol()).orElse(null));
     };
 
+    // storm configuration
     ValueSupplier<Config> stormConf = (parserConfig, clazz) -> {
       Map<String, Object> c = parserConfig.getStormConfig();
       Config finalConfig = new Config();
@@ -399,39 +411,84 @@ public class ParserTopologyCLI {
       return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig);
     };
 
-    Optional<String> outputTopic = ParserOptions.OUTPUT_TOPIC.has(cmd)?Optional.of(ParserOptions.OUTPUT_TOPIC.get(cmd)):Optional.empty();
+    // output topic
+    ValueSupplier<String> outputTopic = (parserConfig, clazz) -> {
+      String topic;
+
+      if(ParserOptions.OUTPUT_TOPIC.has(cmd)) {
+        topic = ParserOptions.OUTPUT_TOPIC.get(cmd);
+
+      } else if(parserConfig.getOutputTopic() != null) {
+        topic = parserConfig.getOutputTopic();
+
+      } else {
+        topic = Constants.ENRICHMENT_TOPIC;
+      }
+
+      return topic;
+    };
+
+    // error topic
+    ValueSupplier<String> errorTopic = (parserConfig, clazz) -> {
+      String topic;
+
+      if(parserConfig.getErrorTopic() != null) {
+        topic = parserConfig.getErrorTopic();
+
+      } else {
+        // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created
+        topic = null;
+      }
+
+      return topic;
+    };
 
-    return getParserTopology(zookeeperUrl, brokerUrl, sensorType, spoutParallelism, spoutNumTasks, parserParallelism, parserNumTasks, errorParallelism, errorNumTasks, spoutConfig, securityProtocol, stormConf, outputTopic);
+    return getParserTopology(
+            zookeeperUrl,
+            brokerUrl,
+            sensorType,
+            spoutParallelism,
+            spoutNumTasks,
+            parserParallelism,
+            parserNumTasks,
+            errorParallelism,
+            errorNumTasks,
+            spoutConfig,
+            securityProtocol,
+            stormConf,
+            outputTopic,
+            errorTopic);
   }
 
-  protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl
-                                                                  , Optional<String> brokerUrl
-                                                                  , String sensorType
-                                                                  , ValueSupplier<Integer> spoutParallelism
-                                                                  , ValueSupplier<Integer> spoutNumTasks
-                                                                  , ValueSupplier<Integer> parserParallelism
-                                                                  , ValueSupplier<Integer> parserNumTasks
-                                                                  , ValueSupplier<Integer> errorParallelism
-                                                                  , ValueSupplier<Integer> errorNumTasks
-                                                                  , ValueSupplier<Map> spoutConfig
-                                                                  , ValueSupplier<String> securityProtocol
-                                                                  , ValueSupplier<Config> stormConf
-                                                                  , Optional<String> outputTopic
-                                                                  ) throws Exception
-  {
-    return ParserTopologyBuilder.build(zookeeperUrl,
-                brokerUrl,
-                sensorType,
-                spoutParallelism,
-                spoutNumTasks,
-                parserParallelism,
-                parserNumTasks,
-                errorParallelism,
-                errorNumTasks,
-                spoutConfig,
-                securityProtocol,
-                outputTopic,
-                stormConf
+  protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl,
+                                                                    Optional<String> brokerUrl,
+                                                                    String sensorType,
+                                                                    ValueSupplier<Integer> spoutParallelism,
+                                                                    ValueSupplier<Integer> spoutNumTasks,
+                                                                    ValueSupplier<Integer> parserParallelism,
+                                                                    ValueSupplier<Integer> parserNumTasks,
+                                                                    ValueSupplier<Integer> errorParallelism,
+                                                                    ValueSupplier<Integer> errorNumTasks,
+                                                                    ValueSupplier<Map> spoutConfig,
+                                                                    ValueSupplier<String> securityProtocol,
+                                                                    ValueSupplier<Config> stormConf,
+                                                                    ValueSupplier<String> outputTopic,
+                                                                    ValueSupplier<String> errorTopic) throws Exception {
+    return ParserTopologyBuilder.build(
+            zookeeperUrl,
+            brokerUrl,
+            sensorType,
+            spoutParallelism,
+            spoutNumTasks,
+            parserParallelism,
+            parserNumTasks,
+            errorParallelism,
+            errorNumTasks,
+            spoutConfig,
+            securityProtocol,
+            outputTopic,
+            errorTopic,
+            stormConf
         );
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 63d9e52..7f40684 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -17,14 +17,6 @@
  */
 package org.apache.metron.parsers.integration.components;
 
-import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
-import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
-
-import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.ZKServerComponent;
@@ -32,24 +24,37 @@ import org.apache.metron.parsers.topology.ParserTopologyBuilder;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.KillOptions;
-import org.apache.storm.topology.TopologyBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
+import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
+
 public class ParserTopologyComponent implements InMemoryComponent {
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private Properties topologyProperties;
   private String brokerUrl;
   private String sensorType;
   private LocalCluster stormCluster;
   private String outputTopic;
+  private String errorTopic;
 
   public static class Builder {
+
     Properties topologyProperties;
     String brokerUrl;
     String sensorType;
     String outputTopic;
+    String errorTopic;
+
     public Builder withTopologyProperties(Properties topologyProperties) {
       this.topologyProperties = topologyProperties;
       return this;
@@ -68,16 +73,31 @@ public class ParserTopologyComponent implements InMemoryComponent {
       return this;
     }
 
+    public Builder withErrorTopic(String topic) {
+      this.errorTopic = topic;
+      return this;
+    }
+
     public ParserTopologyComponent build() {
-      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic);
+
+      if(sensorType == null) {
+        throw new IllegalArgumentException("The sensor type must be defined.");
+      }
+
+      if(outputTopic == null) {
+        throw new IllegalArgumentException("The output topic must be defined.");
+      }
+
+      return new ParserTopologyComponent(topologyProperties, brokerUrl, sensorType, outputTopic, errorTopic);
     }
   }
 
-  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic) {
+  public ParserTopologyComponent(Properties topologyProperties, String brokerUrl, String sensorType, String outputTopic, String errorTopic) {
     this.topologyProperties = topologyProperties;
     this.brokerUrl = brokerUrl;
     this.sensorType = sensorType;
     this.outputTopic = outputTopic;
+    this.errorTopic = errorTopic;
   }
 
   public void updateSensorType(String sensorType) {
@@ -89,24 +109,26 @@ public class ParserTopologyComponent implements InMemoryComponent {
     try {
       final Map<String, Object> stormConf = new HashMap<>();
       stormConf.put(Config.TOPOLOGY_DEBUG, true);
-      ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)
-                                                                   , Optional.ofNullable(brokerUrl)
-                                                                   , sensorType
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> 1
-                                                                   , (x,y) -> new HashMap<>()
-                                                                   , (x,y) -> null
-                                                                   , Optional.ofNullable(outputTopic)
-                                                                   , (x,y) -> {
-                                                                      Config c = new Config();
-                                                                      c.putAll(stormConf);
-                                                                      return c;
-                                                                      }
-                                                                   );
+      ParserTopologyBuilder.ParserTopology topologyBuilder = ParserTopologyBuilder.build (
+              topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY),
+              Optional.ofNullable(brokerUrl),
+              sensorType,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> 1,
+              (x,y) -> new HashMap<>(),
+              (x,y) -> null,
+              (x,y) -> outputTopic,
+              (x,y) -> errorTopic,
+              (x,y) -> {
+                Config c = new Config();
+                c.putAll(stormConf);
+                return c;
+              }
+      );
 
       stormCluster = new LocalCluster();
       stormCluster.submitTopology(sensorType, stormConf, topologyBuilder.getBuilder().createTopology());

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 97dac5a..fcfc93b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -217,17 +217,21 @@ public class ParserTopologyCLITest {
     private Map<String, Object> spoutConfig;
     private String securityProtocol;
     private Config stormConf;
-
-    public ParserInput( ValueSupplier<Integer> spoutParallelism
-                      , ValueSupplier<Integer> spoutNumTasks
-                      , ValueSupplier<Integer> parserParallelism
-                      , ValueSupplier<Integer> parserNumTasks
-                      , ValueSupplier<Integer> errorParallelism
-                      , ValueSupplier<Integer> errorNumTasks
-                      , ValueSupplier<Map> spoutConfig
-                      , ValueSupplier<String> securityProtocol
-                      , ValueSupplier<Config> stormConf
-                      , SensorParserConfig config
+    private String outputTopic;
+    private String errorTopic;
+
+    public ParserInput(ValueSupplier<Integer> spoutParallelism,
+                       ValueSupplier<Integer> spoutNumTasks,
+                       ValueSupplier<Integer> parserParallelism,
+                       ValueSupplier<Integer> parserNumTasks,
+                       ValueSupplier<Integer> errorParallelism,
+                       ValueSupplier<Integer> errorNumTasks,
+                       ValueSupplier<Map> spoutConfig,
+                       ValueSupplier<String> securityProtocol,
+                       ValueSupplier<Config> stormConf,
+                       ValueSupplier<String> outputTopic,
+                       ValueSupplier<String> errorTopic,
+                       SensorParserConfig config
                       )
     {
       this.spoutParallelism = spoutParallelism.get(config, Integer.class);
@@ -239,6 +243,8 @@ public class ParserTopologyCLITest {
       this.spoutConfig = spoutConfig.get(config, Map.class);
       this.securityProtocol = securityProtocol.get(config, String.class);
       this.stormConf = stormConf.get(config, Config.class);
+      this.outputTopic = outputTopic.get(config, String.class);
+      this.errorTopic = outputTopic.get(config, String.class);
     }
 
     public Integer getSpoutParallelism() {
@@ -276,30 +282,43 @@ public class ParserTopologyCLITest {
     public Config getStormConf() {
       return stormConf;
     }
+
+    public String getOutputTopic() {
+      return outputTopic;
+    }
+
+    public String getErrorTopic() {
+      return errorTopic;
+    }
   }
+
   /**
-{
-  "parserClassName": "org.apache.metron.parsers.GrokParser",
-  "sensorTopic": "squid",
-  "parserConfig": {
-    "grokPath": "/patterns/squid",
-    "patternLabel": "SQUID_DELIMITED",
-    "timestampField": "timestamp"
-  },
-  "fieldTransformations" : [
-    {
-      "transformation" : "STELLAR"
-    ,"output" : [ "full_hostname", "domain_without_subdomains" ]
-    ,"config" : {
-      "full_hostname" : "URL_TO_HOST(url)"
-      ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
-                }
-    }
-                           ]
-}
+   * {
+   *   "parserClassName": "org.apache.metron.parsers.GrokParser",
+   *   "sensorTopic": "squid",
+   *   "parserConfig": {
+   *      "grokPath": "/patterns/squid",
+   *      "patternLabel": "SQUID_DELIMITED",
+   *      "timestampField": "timestamp"
+   *   },
+   *   "fieldTransformations" : [
+   *      {
+   *        "transformation" : "STELLAR",
+   *        "output" : [
+   *            "full_hostname",
+   *            "domain_without_subdomains"
+   *        ],
+   *        "config" : {
+   *            "full_hostname" : "URL_TO_HOST(url)",
+   *            "domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
+   *        }
+   *      }
+   *   ]
+   * }
    */
   @Multiline
   public static String baseConfig;
+
   private static SensorParserConfig getBaseConfig() {
     try {
       return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class);
@@ -600,18 +619,37 @@ public class ParserTopologyCLITest {
     final ParserInput[] parserInput = new ParserInput[]{null};
     new ParserTopologyCLI() {
       @Override
-      protected ParserTopologyBuilder.ParserTopology getParserTopology(String zookeeperUrl, Optional<String> brokerUrl, String sensorType, ValueSupplier<Integer> spoutParallelism, ValueSupplier<Integer> spoutNumTasks, ValueSupplier<Integer> parserParallelism, ValueSupplier<Integer> parserNumTasks, ValueSupplier<Integer> errorParallelism, ValueSupplier<Integer> errorNumTasks, ValueSupplier<Map> spoutConfig, ValueSupplier<String> securityProtocol, ValueSupplier<Config> stormConf, Optional<String> outputTopic) throws Exception {
-       parserInput[0] = new ParserInput( spoutParallelism
-                                        , spoutNumTasks
-                                        , parserParallelism
-                                        , parserNumTasks
-                                        , errorParallelism
-                                        , errorNumTasks
-                                        , spoutConfig
-                                        , securityProtocol
-                                        , stormConf
-                                        , config
-                                        );
+      protected ParserTopologyBuilder.ParserTopology getParserTopology(
+              String zookeeperUrl,
+              Optional<String> brokerUrl,
+              String sensorType,
+              ValueSupplier<Integer> spoutParallelism,
+              ValueSupplier<Integer> spoutNumTasks,
+              ValueSupplier<Integer> parserParallelism,
+              ValueSupplier<Integer> parserNumTasks,
+              ValueSupplier<Integer> errorParallelism,
+              ValueSupplier<Integer> errorNumTasks,
+              ValueSupplier<Map> spoutConfig,
+              ValueSupplier<String> securityProtocol,
+              ValueSupplier<Config> stormConf,
+              ValueSupplier<String> outputTopic,
+              ValueSupplier<String> errorTopic) throws Exception {
+
+       parserInput[0] = new ParserInput(
+               spoutParallelism,
+               spoutNumTasks,
+               parserParallelism,
+               parserNumTasks,
+               errorParallelism,
+               errorNumTasks,
+               spoutConfig,
+               securityProtocol,
+               stormConf,
+               outputTopic,
+               errorTopic,
+               config
+       );
+
         return null;
       }
     }.createParserTopology(cmd);

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
index 4f513be..49d7521 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/SimpleHbaseEnrichmentWriterIntegrationTest.java
@@ -30,9 +30,14 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
 import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.integration.*;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
+import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
@@ -40,41 +45,52 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
 
 public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationTest {
 
   /**
-   {
-    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
-   ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter"
-   ,"sensorTopic":"dummy"
-   ,"parserConfig":
-   {
-     "shew.table" : "dummy"
-    ,"shew.cf" : "cf"
-    ,"shew.keyColumns" : "col2"
-    ,"shew.enrichmentType" : "et"
-    ,"shew.hbaseProvider" : "org.apache.metron.hbase.mock.MockHBaseTableProvider"
-    ,"columns" : {
-                "col1" : 0
-               ,"col2" : 1
-               ,"col3" : 2
-                 }
-   }
-   }
+   * {
+   *     "parserClassName": "org.apache.metron.parsers.csv.CSVParser",
+   *     "writerClassName": "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter",
+   *     "sensorTopic": "dummy",
+   *     "outputTopic": "output",
+   *     "errorTopic": "error",
+   *     "parserConfig": {
+   *        "shew.table": "dummy",
+   *        "shew.cf": "cf",
+   *        "shew.keyColumns": "col2",
+   *        "shew.enrichmentType": "et",
+   *        "shew.hbaseProvider": "org.apache.metron.hbase.mock.MockHBaseTableProvider",
+   *        "columns" : {
+   *             "col1": 0,
+   *             "col2": 1,
+   *             "col3": 2
+   *        }
+   *     }
+   * }
    */
   @Multiline
-  public static String parserConfig;
+  public static String parserConfigJSON;
 
   @Test
   public void test() throws UnableToStartException, IOException {
     final String sensorType = "dummy";
+
+    // the input messages to parse
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("col11,col12,col13"));
       add(Bytes.toBytes("col21,col22,col23"));
       add(Bytes.toBytes("col31,col32,col33"));
     }};
+
+    // setup external components; kafka, zookeeper
     MockHBaseTableProvider.addToCache(sensorType, "cf");
     final Properties topologyProperties = new Properties();
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
@@ -83,17 +99,20 @@ public class SimpleHbaseEnrichmentWriterIntegrationTest extends BaseIntegrationT
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
 
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
+
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
             .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH)
-            .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+            .withParserSensorConfig(sensorType, parserConfig);
 
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
             .withSensorType(sensorType)
             .withTopologyProperties(topologyProperties)
-            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+            .withBrokerUrl(kafkaComponent.getBrokerList())
+            .withOutputTopic(parserConfig.getOutputTopic())
+            .build();
 
-    //UnitTestHelper.verboseLogging();
     ComponentRunner runner = new ComponentRunner.Builder()
             .withComponent("zk", zkServerComponent)
             .withComponent("kafka", kafkaComponent)

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index 0cfaae3..cde08bc 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -45,7 +45,6 @@ import java.io.IOException;
 import java.util.*;
 
 public class WriterBoltIntegrationTest extends BaseIntegrationTest {
-  private static final String ERROR_TOPIC = "parser_error";
 
   public static class MockValidator implements FieldValidation{
 
@@ -62,48 +61,53 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }
   }
   /**
-   {
-    "fieldValidations" : [
-        {
-          "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator"
-        }
-                         ],
-   "parser.error.topic":"parser_error"
-   }
-    */
+   * {
+   *   "fieldValidations" : [
+   *      { "validation" : "org.apache.metron.writers.integration.WriterBoltIntegrationTest$MockValidator" }
+   *   ]
+   * }
+   */
   @Multiline
   public static String globalConfig;
 
   /**
-   {
-    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser"
-   ,"sensorTopic":"dummy"
-   ,"parserConfig":
-   {
-    "columns" : {
-                "action" : 0
-               ,"dummy" : 1
-                 }
-   }
-   }
+   * {
+   *    "parserClassName" : "org.apache.metron.parsers.csv.CSVParser",
+   *    "sensorTopic": "dummy",
+   *    "outputTopic": "output",
+   *    "errorTopic": "parser_error",
+   *    "parserConfig": {
+   *        "columns" : {
+   *            "action" : 0,
+   *            "dummy" : 1
+   *        }
+   *    }
+   * }
    */
   @Multiline
-  public static String parserConfig;
+  public static String parserConfigJSON;
 
   @Test
   public void test() throws UnableToStartException, IOException, ParseException {
+
     UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL);
     final String sensorType = "dummy";
+
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
+
+    // the input messages to parser
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("valid,foo"));
       add(Bytes.toBytes("invalid,foo"));
       add(Bytes.toBytes("error"));
     }};
+
+    // setup external components; zookeeper, kafka
     final Properties topologyProperties = new Properties();
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
       add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(ERROR_TOPIC, 1));
+      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
     }});
     topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
@@ -111,14 +115,16 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
             .withTopologyProperties(topologyProperties)
             .withGlobalConfig(globalConfig)
-            .withParserSensorConfig(sensorType, JSONUtils.INSTANCE.load(parserConfig, SensorParserConfig.class));
+            .withParserSensorConfig(sensorType, parserConfig);
 
     ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
             .withSensorType(sensorType)
             .withTopologyProperties(topologyProperties)
-            .withBrokerUrl(kafkaComponent.getBrokerList()).build();
+            .withBrokerUrl(kafkaComponent.getBrokerList())
+            .withErrorTopic(parserConfig.getErrorTopic())
+            .withOutputTopic(parserConfig.getOutputTopic())
+            .build();
 
-    //UnitTestHelper.verboseLogging();
     ComponentRunner runner = new ComponentRunner.Builder()
             .withComponent("zk", zkServerComponent)
             .withComponent("kafka", kafkaComponent)
@@ -131,48 +137,42 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(getProcessor());
+      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(
+              getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic()));
+
+      // validate the output messages
       Map<String,List<JSONObject>> outputMessages = result.getResult();
       Assert.assertEquals(2, outputMessages.size());
       Assert.assertEquals(1, outputMessages.get(Constants.ENRICHMENT_TOPIC).size());
       Assert.assertEquals("valid", outputMessages.get(Constants.ENRICHMENT_TOPIC).get(0).get("action"));
-      Assert.assertEquals(2, outputMessages.get(ERROR_TOPIC).size());
-      JSONObject invalidMessage = outputMessages.get(ERROR_TOPIC).get(0);
+      Assert.assertEquals(2, outputMessages.get(parserConfig.getErrorTopic()).size());
+
+      // validate an error message
+      JSONObject invalidMessage = outputMessages.get(parserConfig.getErrorTopic()).get(0);
       Assert.assertEquals(Constants.ErrorType.PARSER_INVALID.getType(), invalidMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
       JSONObject rawMessage = JSONUtils.INSTANCE.load((String) invalidMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()), JSONObject.class);
       Assert.assertEquals("foo", rawMessage.get("dummy"));
       Assert.assertEquals("invalid", rawMessage.get("action"));
-      JSONObject errorMessage = outputMessages.get(ERROR_TOPIC).get(1);
+
+      // validate the next error message
+      JSONObject errorMessage = outputMessages.get(parserConfig.getErrorTopic()).get(1);
       Assert.assertEquals(Constants.ErrorType.PARSER_ERROR.getType(), errorMessage.get(Constants.ErrorFields.ERROR_TYPE.getName()));
       Assert.assertEquals("error", errorMessage.get(Constants.ErrorFields.RAW_MESSAGE.getName()));
-      // It's unclear if we need a rawMessageBytes field so commenting out for now
-      //Assert.assertTrue(Arrays.equals(listToBytes(errorMessage.get(Constants.ErrorFields.RAW_MESSAGE_BYTES.getName())), "error".getBytes()));
-    }
-    finally {
+
+    } finally {
       if(runner != null) {
         runner.stop();
       }
     }
   }
-  private static byte[] listToBytes(Object o ){
-    List<Byte> l = (List<Byte>)o;
-    byte[] ret = new byte[l.size()];
-    int i = 0;
-    for(Number b : l) {
-      ret[i++] = b.byteValue();
-    }
-    return ret;
-  }
+
   private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
     List<JSONObject> tmp = new ArrayList<>();
-    Iterables.addAll(tmp
-            , Iterables.transform(outputMessages
-                    , message -> {
+    Iterables.addAll(tmp,
+            Iterables.transform(outputMessages,
+                    message -> {
                       try {
-                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message)
-                                             ,JSONUtils.MAP_SUPPLIER 
-                                             )
-                        );
+                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
                       } catch (Exception ex) {
                         throw new IllegalStateException(ex);
                       }
@@ -181,13 +181,14 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     );
     return tmp;
   }
+
   @SuppressWarnings("unchecked")
-  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(){
+  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){
 
     return new KafkaProcessor<>()
             .withKafkaComponentName("kafka")
-            .withReadTopic(Constants.ENRICHMENT_TOPIC)
-            .withErrorTopic(ERROR_TOPIC)
+            .withReadTopic(outputTopic)
+            .withErrorTopic(errorTopic)
             .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
               @Nullable
               @Override
@@ -201,7 +202,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
               public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
                 return new HashMap<String, List<JSONObject>>() {{
                   put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
-                  put(ERROR_TOPIC, loadMessages(messageSet.getErrors()));
+                  put(errorTopic, loadMessages(messageSet.getErrors()));
                 }};
               }
             });

http://git-wip-us.apache.org/repos/asf/metron/blob/3bb926df/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index f73e0f4..c4e3998 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -76,6 +76,11 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
     this.brokerUrl = brokerUrl;
   }
 
+  public KafkaWriter withBrokerUrl(String brokerUrl) {
+    this.brokerUrl = brokerUrl;
+    return this;
+  }
+
   public KafkaWriter withZkQuorum(String zkQuorum) {
     this.zkQuorum = zkQuorum;
     return this;


[02/11] metron git commit: METRON-1529 CONFIG_GET Fails to Retrieve Latest Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997

Posted by ot...@apache.org.
METRON-1529 CONFIG_GET Fails to Retrieve Latest Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/37e3fd32
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/37e3fd32
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/37e3fd32

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 37e3fd32c256ddc129eb7c1363d78e9095a39748
Parents: b5bf9a9
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Apr 25 09:27:18 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Apr 25 09:27:18 2018 -0400

----------------------------------------------------------------------
 .../configuration/ConfigurationsUtils.java      | 123 +++-
 .../management/ConfigurationFunctions.java      | 564 ++++++++++---------
 .../management/ConfigurationFunctionsTest.java  | 424 ++++++++++----
 .../shell/DefaultStellarShellExecutor.java      |   4 +-
 .../common/utils/StellarProcessorUtils.java     | 135 +++--
 5 files changed, 825 insertions(+), 425 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
index a89db63..c7b39f0 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
@@ -45,6 +46,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -235,12 +237,99 @@ public class ConfigurationsUtils {
                               );
   }
 
+  /**
+   * Reads the global configuration stored in Zookeeper.
+   *
+   * @param client The Zookeeper client.
+   * @return The global configuration, if one exists.  Otherwise, null.
+   * @throws Exception
+   */
+  public static Map<String, Object> readGlobalConfigFromZookeeper(CuratorFramework client) throws Exception {
+    Map<String, Object> config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(GLOBAL.getZookeeperRoot(), client);
+    if(bytes.isPresent()) {
+      InputStream in = new ByteArrayInputStream(bytes.get());
+      config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER);
+    }
+
+    return config;
+  }
+
+  /**
+   * Reads the Indexing configuration from Zookeeper.
+   *
+   * @param sensorType The type of sensor.
+   * @param client The Zookeeper client.
+   * @return The indexing configuration for the given sensor type, if one exists.  Otherwise, null.
+   * @throws Exception
+   */
+  public static Map<String, Object> readSensorIndexingConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
+    Map<String, Object> config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(INDEXING.getZookeeperRoot() + "/" + sensorType, client);
+    if(bytes.isPresent()) {
+      InputStream in = new ByteArrayInputStream(bytes.get());
+      config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER);
+    }
+
+    return config;
+  }
+
+  /**
+   * Reads the Enrichment configuration from Zookeeper.
+   *
+   * @param sensorType The type of sensor.
+   * @param client The Zookeeper client.
+   * @return The Enrichment configuration for the given sensor type, if one exists. Otherwise, null.
+   * @throws Exception
+   */
   public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class);
+    SensorEnrichmentConfig config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client);
+    if (bytes.isPresent()) {
+      config = SensorEnrichmentConfig.fromBytes(bytes.get());
+    }
+
+    return config;
   }
 
+  /**
+   * Reads the Parser configuration from Zookeeper.
+   *
+   * @param sensorType The type of sensor.
+   * @param client The Zookeeper client.
+   * @return The Parser configuration for the given sensor type, if one exists. Otherwise, null.
+   * @throws Exception
+   */
   public static SensorParserConfig readSensorParserConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception {
-    return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(PARSER.getZookeeperRoot() + "/" + sensorType, client)), SensorParserConfig.class);
+    SensorParserConfig config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(PARSER.getZookeeperRoot() + "/" + sensorType, client);
+    if(bytes.isPresent()) {
+      config = SensorParserConfig.fromBytes(bytes.get());
+    }
+
+    return config;
+  }
+
+  /**
+   * Reads the Profiler configuration from Zookeeper.
+   *
+   * @param client The Zookeeper client.
+   * @return THe Profiler configuration.
+   * @throws Exception
+   */
+  public static ProfilerConfig readProfilerConfigFromZookeeper(CuratorFramework client) throws Exception {
+    ProfilerConfig config = null;
+
+    Optional<byte[]> bytes = readFromZookeeperSafely(PROFILER.getZookeeperRoot(), client);
+    if(bytes.isPresent()) {
+      config = ProfilerConfig.fromBytes(bytes.get());
+    }
+
+    return config;
   }
 
   public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception {
@@ -289,6 +378,36 @@ public class ConfigurationsUtils {
     }
   }
 
+  /**
+   * Read raw bytes from Zookeeper.
+   *
+   * @param path The path to the Zookeeper node to read.
+   * @param client The Zookeeper client.
+   * @return The bytes read from Zookeeper, if node exists.  Otherwise, null.
+   * @throws Exception
+   */
+  public static Optional<byte[]> readFromZookeeperSafely(String path, CuratorFramework client) throws Exception {
+    Optional<byte[]> result = Optional.empty();
+
+    try {
+      byte[] bytes = readFromZookeeper(path, client);
+      result = Optional.of(bytes);
+
+    } catch(KeeperException.NoNodeException e) {
+      LOG.debug("Zookeeper node missing; path={}", e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Read raw bytes from Zookeeper.
+   *
+   * @param path The path to the Zookeeper node to read.
+   * @param client The Zookeeper client.
+   * @return The bytes read from Zookeeper.
+   * @throws Exception If the path does not exist in Zookeeper.
+   */
   public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception {
     if (client != null && client.getData() != null && path != null) {
       return client.getData().forPath(path);

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
index af90e14..5a1281c 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java
@@ -18,26 +18,17 @@
 package org.apache.metron.management;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.TreeCache;
-import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
-import org.apache.curator.framework.recipes.cache.TreeCacheListener;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ConfigurationType;
-import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
@@ -46,203 +37,280 @@ import org.apache.metron.stellar.dsl.StellarFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.INDEXING;
+import static org.apache.metron.common.configuration.ConfigurationType.PARSER;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorParserConfigFromZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorIndexingConfigToZookeeper;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorParserConfigToZookeeper;
+
+/**
+ * Defines functions that enable modification of Metron configuration values.
+ */
 public class ConfigurationFunctions {
+
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static EnumMap<ConfigurationType, Object> configMap = new EnumMap<ConfigurationType, Object>(ConfigurationType.class) {{
-    for(ConfigurationType ct : ConfigurationType.values()) {
-      put(ct, Collections.synchronizedMap(new HashMap<String, String>()));
-    }
-    put(ConfigurationType.GLOBAL, "");
-    put(ConfigurationType.PROFILER, "");
-  }};
-  private static synchronized void setupTreeCache(Context context) throws Exception {
-    try {
-      Optional<Object> treeCacheOpt = context.getCapability("treeCache");
-      if (treeCacheOpt.isPresent()) {
-        return;
-      }
+
+
+  /**
+   * Retrieves the Zookeeper client from the execution context.
+   *
+   * @param context The execution context.
+   * @return A Zookeeper client, if one exists.  Otherwise, an exception is thrown.
+   */
+  private static CuratorFramework getZookeeperClient(Context context) {
+
+    Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT, true);
+    if(clientOpt.isPresent()) {
+      return (CuratorFramework) clientOpt.get();
+
+    } else {
+      throw new IllegalStateException("Missing ZOOKEEPER_CLIENT; zookeeper connection required");
     }
-    catch(IllegalStateException ex) {
+  }
 
+  /**
+   * Get an argument from a list of arguments.
+   *
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(int index, Class<T> clazz, List<Object> args) {
+
+    if(index >= args.size()) {
+      throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size()));
     }
-    Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
-    if(!clientOpt.isPresent()) {
-      throw new IllegalStateException("I expected a zookeeper client to exist and it did not.  Please connect to zookeeper.");
+
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  /**
+   * Serializes a configuration object to the raw JSON.
+   *
+   * @param object The configuration object to serialize
+   * @return
+   */
+  private static String toJSON(Object object) {
+
+    if(object == null) {
+      return null;
     }
-    CuratorFramework client = (CuratorFramework) clientOpt.get();
-    TreeCache cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT);
-    TreeCacheListener listener = new TreeCacheListener() {
-      @Override
-      public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
-        if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
-          String path = event.getData().getPath();
-          byte[] data = event.getData().getData();
-          String sensor = Iterables.getLast(Splitter.on("/").split(path), null);
-          if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER);
-            sensorMap.put(sensor, new String(data));
-          } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.GLOBAL, new String(data));
-          } else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.PROFILER, new String(data));
-          } else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
-            sensorMap.put(sensor, new String(data));
-          } else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
-            sensorMap.put(sensor, new String(data));
-          }
-        }
-        else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) {
-          String path = event.getData().getPath();
-          String sensor = Iterables.getLast(Splitter.on("/").split(path), null);
-          if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER);
-            sensorMap.remove(sensor);
-          }
-          else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT);
-            sensorMap.remove(sensor);
-          }
-          else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) {
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING);
-            sensorMap.remove(sensor);
-          }
-          else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.PROFILER, null);
-          }
-          else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) {
-            configMap.put(ConfigurationType.GLOBAL, null);
-          }
-        }
-      }
-    };
-    cache.getListenable().addListener(listener);
-    cache.start();
-    for(ConfigurationType ct : ConfigurationType.values()) {
-      switch(ct) {
-        case GLOBAL:
-        case PROFILER:
-          {
-            String data = "";
-            try {
-              byte[] bytes = ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot(), client);
-              data = new String(bytes);
-            }
-            catch(Exception ex) {
-
-            }
-            configMap.put(ct, data);
-          }
-          break;
-        case INDEXING:
-        case ENRICHMENT:
-        case PARSER:
-          {
-            List<String> sensorTypes = client.getChildren().forPath(ct.getZookeeperRoot());
-            Map<String, String> sensorMap = (Map<String, String>)configMap.get(ct);
-            for(String sensorType : sensorTypes) {
-              sensorMap.put(sensorType, new String(ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot() + "/" + sensorType, client)));
-            }
-          }
-          break;
-      }
+
+    try {
+      return JSONUtils.INSTANCE.toJSON(object, true);
+
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
     }
-    context.addCapability("treeCache", () -> cache);
   }
 
   @Stellar(
-           namespace = "CONFIG"
-          ,name = "GET"
-          ,description = "Retrieve a Metron configuration from zookeeper."
-          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
-                    , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
-                    , "emptyIfNotPresent - If true, then return an empty, minimally viable config"
-                    }
-          ,returns = "The String representation of the config in zookeeper"
-          )
+          namespace = "CONFIG",
+          name = "GET",
+          description = "Retrieve a Metron configuration from zookeeper.",
+          params = {
+                  "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER",
+                  "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)",
+                  "emptyIfNotPresent - If true, then return an empty, minimally viable config"
+          },
+          returns = "The String representation of the config in zookeeper")
   public static class ConfigGet implements StellarFunction {
-    boolean initialized = false;
+
+    /**
+     * Whether the function has been initialized.
+     */
+    private boolean initialized = false;
+
+    /**
+     * The Zookeeper client.
+     */
+    private CuratorFramework zkClient;
+
     @Override
     public Object apply(List<Object> args, Context context) throws ParseException {
-      ConfigurationType type = ConfigurationType.valueOf((String)args.get(0));
-      boolean emptyIfNotPresent = true;
+      String result;
 
-      switch(type) {
-        case GLOBAL:
-        case PROFILER:
-          return configMap.get(type);
-        case PARSER: {
-          String sensor = (String) args.get(1);
-          if(args.size() > 2) {
-            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
-          }
-          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
-          String ret = sensorMap.get(sensor);
-          if (ret == null && emptyIfNotPresent ) {
-            SensorParserConfig config = new SensorParserConfig();
-            config.setSensorTopic(sensor);
-            try {
-              ret = JSONUtils.INSTANCE.toJSON(config, true);
-            } catch (JsonProcessingException e) {
-              LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
-              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
-            }
-          }
-          return ret;
-        }
-        case INDEXING: {
-          String sensor = (String) args.get(1);
-          if(args.size() > 2) {
-            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
-          }
-          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
-          String ret = sensorMap.get(sensor);
-          if (ret == null && emptyIfNotPresent ) {
-            Map<String, Object> config = new HashMap<>();
-            try {
-              ret = JSONUtils.INSTANCE.toJSON(config, true);
-              IndexingConfigurations.setIndex(config, sensor);
-            } catch (JsonProcessingException e) {
-              LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
-              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
-            }
-          }
-          return ret;
-        }
-        case ENRICHMENT: {
-          String sensor = (String) args.get(1);
-          if(args.size() > 2) {
-            emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class);
-          }
-          Map<String, String> sensorMap = (Map<String, String>) configMap.get(type);
-          String ret = sensorMap.get(sensor);
-          if (ret == null && emptyIfNotPresent ) {
-            SensorEnrichmentConfig config = new SensorEnrichmentConfig();
-            try {
-              ret = JSONUtils.INSTANCE.toJSON(config, true);
-            } catch (JsonProcessingException e) {
-              LOG.error("Unable to serialize default object: {}", e.getMessage(), e);
-              throw new ParseException("Unable to serialize default object: " + e.getMessage(), e);
-            }
-          }
-          return ret;
+      // the configuration type to write
+      String arg0 = getArg(0, String.class, args);
+      ConfigurationType type = ConfigurationType.valueOf(arg0);
+
+      try {
+
+        if (GLOBAL == type) {
+          result = getGlobalConfig(args);
+
+        } else if (PROFILER == type) {
+          result = getProfilerConfig(args);
+
+        } else if (ENRICHMENT == type) {
+          result = getEnrichmentConfig(args);
+
+        } else if (INDEXING == type) {
+          result = getIndexingConfig(args);
+
+        } else if (PARSER == type) {
+          result = getParserConfig(args);
+
+        } else {
+          throw new IllegalArgumentException("Unexpected configuration type: " + type);
         }
-        default:
-          throw new UnsupportedOperationException("Unable to support type " + type);
+
+      } catch(Exception e) {
+        throw new RuntimeException(e);
       }
+
+      return result;
     }
 
-    @Override
-    public void initialize(Context context) {
-      try {
-        setupTreeCache(context);
-      } catch (Exception e) {
-        LOG.error("Unable to initialize: {}", e.getMessage(), e);
+    /**
+     * Retrieves the Global configuration.
+     *
+     * @return The Global configuration.
+     * @throws Exception
+     */
+    private String getGlobalConfig(List<Object> args) throws Exception {
+
+      Map<String, Object> globals = readGlobalConfigFromZookeeper(zkClient);
+
+      // provide empty/default config if one is not present?
+      if(globals == null && emptyIfNotPresent(args)) {
+        globals = new HashMap<>();
       }
-      finally {
-        initialized = true;
+
+      return toJSON(globals);
+    }
+
+    /**
+     * Retrieves the Parser configuration.
+     *
+     * @param args The function arguments.
+     * @return The Parser configuration.
+     * @throws Exception
+     */
+    private String getParserConfig(List<Object> args) throws Exception {
+
+      // retrieve the enrichment config for the given sensor
+      String sensor = getArg(1, String.class, args);
+      SensorParserConfig sensorConfig = readSensorParserConfigFromZookeeper(sensor, zkClient);
+
+      // provide empty/default config if one is not present?
+      if(sensorConfig == null && emptyIfNotPresent(args)) {
+        sensorConfig = new SensorParserConfig();
       }
+
+     return toJSON(sensorConfig);
+    }
+
+    /**
+     * Retrieve the Enrichment configuration.
+     *
+     * @param args The function arguments.
+     * @return The Enrichment configuration as a JSON string.
+     * @throws Exception
+     */
+    private String getEnrichmentConfig(List<Object> args) throws Exception {
+
+      // retrieve the enrichment config for the given sensor
+      String sensor = getArg(1, String.class, args);
+      SensorEnrichmentConfig sensorConfig = readSensorEnrichmentConfigFromZookeeper(sensor, zkClient);
+
+      // provide empty/default config if one is not present?
+      if(sensorConfig == null && emptyIfNotPresent(args)) {
+        sensorConfig = new SensorEnrichmentConfig();
+      }
+
+      return toJSON(sensorConfig);
+    }
+
+    /**
+     * Retrieve the Indexing configuration.
+     *
+     * @param args The function arguments.
+     * @return The Indexing configuration as a JSON string.
+     * @throws Exception
+     */
+    private String getIndexingConfig(List<Object> args) throws Exception {
+
+      // retrieve the enrichment config for the given sensor
+      String sensor = getArg(1, String.class, args);
+      Map<String, Object> sensorConfig = readSensorIndexingConfigFromZookeeper(sensor, zkClient);
+
+      // provide empty/default config if one is not present?
+      if(sensorConfig == null && emptyIfNotPresent(args)) {
+        sensorConfig = Collections.emptyMap();
+      }
+
+      return toJSON(sensorConfig);
+    }
+
+    /**
+     * Retrieve the Profiler configuration.
+     *
+     * @param args The function arguments.
+     * @return The Profiler configuration as a JSON string.
+     * @throws Exception
+     */
+    private String getProfilerConfig(List<Object> args) throws Exception {
+
+      ProfilerConfig profilerConfig = readProfilerConfigFromZookeeper(zkClient);
+
+      // provide empty/default config if one is not present?
+      if(profilerConfig == null && emptyIfNotPresent(args)) {
+        profilerConfig = new ProfilerConfig();
+      }
+
+      return toJSON(profilerConfig);
+    }
+
+    /**
+     * Retrieves the 'emptyIfNotPresent' argument.
+     *
+     * <p>This determines whether a default configuration should be returned, if no
+     * configuration is not present.  This defaults to true.
+     *
+     * @param args The function arguments.
+     * @return The 'emptyIfNotPresent' argument.
+     * @throws Exception
+     */
+    private boolean emptyIfNotPresent(List<Object> args) {
+
+      boolean emptyIfNotPresent = true;
+      int lastIndex = args.size() - 1;
+
+      // expect 'emptyIfNotPresent' to always be the last boolean arg
+      if(args.size() >= 2 && args.get(lastIndex) instanceof Boolean) {
+        emptyIfNotPresent = getArg(lastIndex, Boolean.class, args);
+      }
+
+      return emptyIfNotPresent;
+    }
+
+    @Override
+    public void initialize(Context context) {
+      zkClient = getZookeeperClient(context);
     }
 
     @Override
@@ -250,91 +318,69 @@ public class ConfigurationFunctions {
       return initialized;
     }
   }
+
   @Stellar(
-           namespace = "CONFIG"
-          ,name = "PUT"
-          ,description = "Updates a Metron config to Zookeeper."
-          ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER"
-                    ,"config - The config (a string in JSON form) to update"
-                    , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
-                    }
-          ,returns = "The String representation of the config in zookeeper"
-          )
+          namespace = "CONFIG",
+          name = "PUT",
+          description = "Updates a Metron config to Zookeeper.",
+          params = {
+                  "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER",
+                  "config - The config (a string in JSON form) to update",
+                  "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)"
+          },
+          returns = "The String representation of the config in zookeeper")
   public static class ConfigPut implements StellarFunction {
-    private CuratorFramework client;
-    private boolean initialized = false;
 
     @Override
     public Object apply(List<Object> args, Context context) throws ParseException {
-      ConfigurationType type = ConfigurationType.valueOf((String)args.get(0));
-      String config = (String)args.get(1);
-      if(config == null) {
-        return null;
-      }
-      try {
-        switch (type) {
-          case GLOBAL:
-            ConfigurationsUtils.writeGlobalConfigToZookeeper(config.getBytes(), client);
-            break;
-          case PROFILER:
-            ConfigurationsUtils.writeProfilerConfigToZookeeper(config.getBytes(), client);
-            break;
-          case ENRICHMENT:
-          {
-            String sensor = (String) args.get(2);
-            if(sensor == null) {
-              return null;
-            }
-            ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.getBytes(), client);
-          }
-          break;
-          case INDEXING:
-          {
-            String sensor = (String) args.get(2);
-            if(sensor == null) {
-              return null;
-            }
-            ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(sensor, config.getBytes(), client);
-          }
-          break;
-          case PARSER:
-            {
-            String sensor = (String) args.get(2);
-              if(sensor == null) {
-              return null;
-            }
-            ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensor, config.getBytes(), client);
+
+      // the configuration type to write
+      String arg0 = getArg(0, String.class, args);
+      ConfigurationType type = ConfigurationType.valueOf(arg0);
+
+      // the configuration value to write
+      String value = getArg(1, String.class, args);
+      if(value != null) {
+
+        CuratorFramework client = getZookeeperClient(context);
+        try {
+
+          if(GLOBAL == type) {
+            writeGlobalConfigToZookeeper(value.getBytes(), client);
+
+          } else if(PROFILER == type) {
+            writeProfilerConfigToZookeeper(value.getBytes(), client);
+
+          } else if(ENRICHMENT == type) {
+            String sensor = getArg(2, String.class, args);
+            writeSensorEnrichmentConfigToZookeeper(sensor, value.getBytes(), client);
+
+          } else if(INDEXING == type) {
+            String sensor = getArg(2, String.class, args);
+            writeSensorIndexingConfigToZookeeper(sensor, value.getBytes(), client);
+
+          } else if (PARSER == type) {
+            String sensor = getArg(2, String.class, args);
+            writeSensorParserConfigToZookeeper(sensor, value.getBytes(), client);
           }
-          break;
+
+        } catch(Exception e) {
+          LOG.error("Unexpected exception: {}", e.getMessage(), e);
+          throw new ParseException(e.getMessage());
         }
       }
-      catch(Exception ex) {
-        LOG.error("Unable to put config: {}", ex.getMessage(), ex);
-        throw new ParseException("Unable to put config: " + ex.getMessage(), ex);
-      }
+
       return null;
     }
 
     @Override
     public void initialize(Context context) {
-      Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT);
-      if(!clientOpt.isPresent()) {
-        throw new IllegalStateException("I expected a zookeeper client to exist and it did not.  Please connect to zookeeper.");
-      }
-      client = (CuratorFramework) clientOpt.get();
-      try {
-        setupTreeCache(context);
-      } catch (Exception e) {
-        LOG.error("Unable to initialize: {}", e.getMessage(), e);
-      }
-      finally {
-        initialized = true;
-      }
+      // nothing to do
     }
 
     @Override
     public boolean isInitialized() {
-      return initialized;
+      return true;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
index 1920031..67e2a9d 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java
@@ -19,194 +19,393 @@ package org.apache.metron.management;
 
 import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.PosixParser;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.log4j.Level;
 import org.apache.metron.common.cli.ConfigurationManager;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.test.utils.UnitTestHelper;
-import org.json.simple.parser.JSONParser;
 import org.json.simple.JSONObject;
-import org.junit.Assert;
+import org.json.simple.parser.JSONParser;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Map;
 
 import static org.apache.metron.TestConstants.PARSER_CONFIGS_PATH;
 import static org.apache.metron.TestConstants.SAMPLE_CONFIG_PATH;
+import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL;
+import static org.apache.metron.common.configuration.ConfigurationType.PROFILER;
+import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper;
 import static org.apache.metron.management.utils.FileUtils.slurp;
 import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests the ConfigurationFunctions class.
+ */
 public class ConfigurationFunctionsTest {
+
   private static TestingServer testZkServer;
-  private static CuratorFramework client;
   private static String zookeeperUrl;
-  private Context context = new Context.Builder()
-            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
-            .build();
+  private static CuratorFramework client;
+  private static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json");
+  private static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+  private static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json");
+  private static String goodTestIndexingConfig = slurp( SAMPLE_CONFIG_PATH + "/indexing/test.json");
+
+  private Context context;
+  private JSONParser parser;
+
+  /**
+   * {
+   *   "profiles" : [
+   *      {
+   *        "profile" : "counter",
+   *        "foreach" : "ip_src_addr",
+   *        "init"    : { "counter" : 0 },
+   *        "update"  : { "counter" : "counter + 1" },
+   *        "result"  : "counter"
+   *      }
+   *   ],
+   *   "timestampField" : "timestamp"
+   * }
+   */
+  @Multiline
+  private static String goodProfilerConfig;
+
   @BeforeClass
-  public static void setup() throws Exception {
+  public static void setupZookeeper() throws Exception {
+
+    // zookeeper server
     testZkServer = new TestingServer(true);
     zookeeperUrl = testZkServer.getConnectString();
+
+    // zookeeper client
     client = ConfigurationsUtils.getClient(zookeeperUrl);
     client.start();
+  }
 
-    pushConfigs(SAMPLE_CONFIG_PATH);
-    pushConfigs(PARSER_CONFIGS_PATH);
+  @Before
+  public void setup() throws Exception {
 
+    context = new Context.Builder()
+            .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
+            .build();
+
+    parser = new JSONParser();
 
+    // push configs to zookeeper
+    pushConfigs(SAMPLE_CONFIG_PATH, zookeeperUrl);
+    pushConfigs(PARSER_CONFIGS_PATH, zookeeperUrl);
+    writeProfilerConfigToZookeeper(goodProfilerConfig.getBytes(), client);
   }
 
-  private static void pushConfigs(String inputPath) throws Exception {
-    String[] args = new String[]{
-            "-z", zookeeperUrl
-            , "--mode", "PUSH"
-            , "--input_dir", inputPath
-    };
-    ConfigurationManager manager = new ConfigurationManager();
-    manager.run(ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args));
+  /**
+   * Deletes a path within Zookeeper.
+   *
+   * @param path The path within Zookeeper to delete.
+   * @throws Exception
+   */
+  private void deletePath(String path) throws Exception {
+    client.delete().forPath(path);
   }
 
+  /**
+   * Transforms a String to a {@link JSONObject}.
+   *
+   * @param input The input String to transform
+   * @return A {@link JSONObject}.
+   * @throws org.json.simple.parser.ParseException
+   */
+  private JSONObject toJSONObject(String input) throws org.json.simple.parser.ParseException {
 
-  static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json");
+    if(input == null) {
+      return null;
+    }
+    return (JSONObject) parser.parse(input.trim());
+  }
 
   /**
-    {
-      "sensorTopic" : "brop",
-      "parserConfig" : { },
-      "fieldTransformations" : [ ],
-      "readMetadata":false,
-      "mergeMetadata":false,
-      "parserParallelism" : 1,
-      "errorWriterParallelism" : 1,
-      "spoutNumTasks" : 1,
-      "stormConfig" : {},
-      "errorWriterNumTasks":1,
-      "spoutConfig":{},
-      "parserNumTasks":1,
-      "spoutParallelism":1
-    }
+   * Push configuration values to Zookeeper.
+   *
+   * @param inputPath The local filesystem path to the configurations.
+   * @param zookeeperUrl The URL of Zookeeper.
+   * @throws Exception
    */
-  @Multiline
-  static String defaultBropParserConfig;
+  private static void pushConfigs(String inputPath, String zookeeperUrl) throws Exception {
+
+    String[] args = new String[] {
+            "-z", zookeeperUrl,
+            "--mode", "PUSH",
+            "--input_dir", inputPath
+    };
+    CommandLine cli = ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args);
 
+    ConfigurationManager manager = new ConfigurationManager();
+    manager.run(cli);
+  }
 
+  /**
+   * The CONFIG_GET function should be able to return the Parser configuration
+   * for a given sensor.
+   */
   @Test
-  public void testParserGetHappyPath() {
+  public void testGetParser() throws Exception {
+
+    String out = (String) run("CONFIG_GET('PARSER', 'bro')", context);
 
-    Object out = run("CONFIG_GET('PARSER', 'bro')", new HashMap<>(), context);
-    Assert.assertEquals(goodBroParserConfig, out);
+    SensorParserConfig actual = SensorParserConfig.fromBytes(out.getBytes());
+    SensorParserConfig expected = SensorParserConfig.fromBytes(goodBroParserConfig.getBytes());
+    assertEquals(expected, actual);
   }
 
+  /**
+   * The CONFIG_GET function should NOT return any configuration when the
+   * Parser configuration for a given sensor is missing AND emptyIfNotPresent = false.
+   */
   @Test
-  public void testParserGetMissWithoutDefault() {
+  public void testGetParserMissWithoutDefault() {
 
-    {
-      Object out = run("CONFIG_GET('PARSER', 'brop', false)", new HashMap<>(), context);
-      Assert.assertNull(out);
-    }
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('PARSER', 'sensor', false)", context);
+    assertNull(out);
   }
 
+  /**
+   * The CONFIG_GET function should return a default configuration when none
+   * currently exists.
+   */
   @Test
-  public void testParserGetMissWithDefault() throws Exception {
-    JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropParserConfig);
+  public void testGetParserMissWithDefault() throws Exception {
 
+    SensorParserConfig expected = new SensorParserConfig();
     {
-      Object out = run("CONFIG_GET('PARSER', 'brop')", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      Object out = run("CONFIG_GET('PARSER', 'sensor')", context);
+      SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes());
+      assertEquals(expected, actual);
     }
     {
-      Object out = run("CONFIG_GET('PARSER', 'brop', true)", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      Object out = run("CONFIG_GET('PARSER', 'sensor', true)", context);
+      SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes());
+      assertEquals(expected, actual);
     }
   }
 
-  static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json");
+  /**
+   * The CONFIG_GET function should be able to return the Enrichment configuration
+   * for a given sensor.
+   */
+  @Test
+  public void testGetEnrichment() throws Exception {
+
+    String out = (String) run("CONFIG_GET('ENRICHMENT', 'test')", context);
+
+    SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+    SensorEnrichmentConfig expected = SensorEnrichmentConfig.fromBytes(goodTestEnrichmentConfig.getBytes());
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * No default configuration should be provided in this case.
+   */
+  @Test
+  public void testGetEnrichmentMissWithoutDefault() {
+
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('ENRICHMENT', 'sense', false)", context);
+    assertNull(out);
+  }
 
   /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
+  @Test
+  public void testGetEnrichmentMissWithDefault() throws Exception {
+
+    // expect an empty configuration to be returned
+    SensorEnrichmentConfig expected = new SensorEnrichmentConfig();
     {
-      "enrichment" : {
-        "fieldMap" : { },
-        "fieldToTypeMap" : { },
-        "config" : { }
-      },
-      "threatIntel" : {
-        "fieldMap" : { },
-        "fieldToTypeMap" : { },
-        "config" : { },
-        "triageConfig" : {
-          "riskLevelRules" : [ ],
-          "aggregator" : "MAX",
-          "aggregationConfig" : { }
-        }
-      },
-      "configuration" : { }
+      String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor')", context);
+      SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+      assertEquals(expected, actual);
+    }
+    {
+      String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor', true)", context);
+      SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes());
+      assertEquals(expected, actual);
     }
+  }
+
+  /**
+   * The CONFIG_GET function should be able to return the Indexing configuration
+   * for a given sensor.
    */
-  @Multiline
-  static String defaultBropEnrichmentConfig;
+  @Test
+  public void testGetIndexing() throws Exception {
 
+    String out = (String) run("CONFIG_GET('INDEXING', 'test')", context);
+
+    Map<String, Object> actual = toJSONObject(out);
+    Map<String, Object> expected = toJSONObject(goodTestIndexingConfig);
+    assertEquals(expected, actual);
+  }
 
+  /**
+   * No default configuration should be provided in this case.
+   */
   @Test
-  public void testEnrichmentGetHappyPath() {
+  public void testGetIndexingMissWithoutDefault() {
 
-    Object out = run("CONFIG_GET('ENRICHMENT', 'test')", new HashMap<>(), context);
-    Assert.assertEquals(goodTestEnrichmentConfig, out.toString().trim());
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('INDEXING', 'sense', false)", context);
+    assertNull(out);
   }
 
+  /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
   @Test
-  public void testEnrichmentGetMissWithoutDefault() {
+  public void testGetIndexingtMissWithDefault() throws Exception {
 
+    // expect an empty configuration to be returned
+    Map<String, Object> expected = Collections.emptyMap();
+    {
+      String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor')", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
+    }
     {
-      Object out = run("CONFIG_GET('ENRICHMENT', 'brop', false)", new HashMap<>(), context);
-      Assert.assertNull(out);
+      String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor', true)", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
     }
   }
 
+  /**
+   * The CONFIG_GET function should be able to return the Profiler configuration.
+   */
+  @Test
+  public void testGetProfiler() throws Exception {
+
+    String out = (String) run("CONFIG_GET('PROFILER')", context);
+
+    ProfilerConfig actual = ProfilerConfig.fromBytes(out.getBytes());
+    ProfilerConfig expected = ProfilerConfig.fromBytes(goodProfilerConfig.getBytes());
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * No default configuration should be provided in this case.
+   */
   @Test
-  public void testEnrichmentGetMissWithDefault() throws Exception {
-    JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropEnrichmentConfig);
+  public void testGetProfilerMissWithoutDefault() throws Exception {
+
+    deletePath(PROFILER.getZookeeperRoot());
 
+    // expect null because emptyIfNotPresent = false
+    String out = (String) run("CONFIG_GET('PROFILER', false)", context);
+    assertNull(out);
+  }
+
+  /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
+  @Test
+  public void testGetProfilerMissWithDefault() throws Exception {
+
+    // there is no profiler config in zookeeper
+    deletePath(PROFILER.getZookeeperRoot());
+
+    // expect an empty configuration to be returned
+    ProfilerConfig expected = new ProfilerConfig();
     {
-      Object out = run("CONFIG_GET('ENRICHMENT', 'brop')", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      String out = (String) run("CONFIG_GET('PROFILER', true)", context);
+      ProfilerConfig actual = ProfilerConfig.fromJSON(out);
+      assertEquals(expected, actual);
     }
     {
-      Object out = run("CONFIG_GET('ENRICHMENT', 'brop', true)", new HashMap<>(), context);
-      JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim());
-      Assert.assertEquals(expected, actual);
+      String out = (String) run("CONFIG_GET('PROFILER')", context);
+      ProfilerConfig actual = ProfilerConfig.fromJSON(out);
+      assertEquals(expected, actual);
     }
   }
 
-  static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json");
+  @Test
+  public void testGetGlobal() throws Exception {
+
+    String out = (String) run("CONFIG_GET('GLOBAL')", context);
+
+    Map<String, Object> actual = toJSONObject(out);
+    Map<String, Object> expected = toJSONObject(goodGlobalConfig);
+    assertEquals(expected, actual);
+  }
+
+  /**
+   * No default configuration should be provided in this case.
+   */
+  @Test
+  public void testGetGlobalMissWithoutDefault() throws Exception {
+
+    // there is no global config in zookeeper
+    deletePath(GLOBAL.getZookeeperRoot());
+
+    // expect null because emptyIfNotPresent = false
+    Object out = run("CONFIG_GET('GLOBAL', false)", context);
+    assertNull(out);
+  }
 
+  /**
+   * A default empty configuration should be provided, if one does not exist.
+   */
   @Test
-  public void testGlobalGet() {
+  public void testGetGlobalMissWithDefault() throws Exception {
+
+    // there is no global config in zookeeper
+    deletePath(GLOBAL.getZookeeperRoot());
 
-    Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context);
-    Assert.assertEquals(goodGlobalConfig, out.toString().trim());
+    // expect an empty configuration to be returned
+    Map<String, Object> expected = Collections.emptyMap();
+    {
+      String out = (String) run("CONFIG_GET('GLOBAL')", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
+    }
+    {
+      String out = (String) run("CONFIG_GET('GLOBAL', true)", context);
+      Map<String, Object> actual = toJSONObject(out);
+      assertEquals(expected, actual);
+    }
   }
 
   @Test
-  public void testGlobalPut() {
+  public void testPutGlobal() throws Exception {
+
+    String out = (String) run("CONFIG_GET('GLOBAL')", context);
 
-    Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context);
-    Assert.assertEquals(goodGlobalConfig, out.toString().trim());
+    Map<String, Object> actual = toJSONObject(out);
+    Map<String, Object> expected = toJSONObject(goodGlobalConfig);
+    assertEquals(expected, actual);
   }
 
   @Test(expected=ParseException.class)
-  public void testGlobalPutBad() {
+  public void testPutGlobalBad() {
     {
       UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
       try {
-        run("CONFIG_PUT('GLOBAL', 'foo bar')", new HashMap<>(), context);
+        run("CONFIG_PUT('GLOBAL', 'foo bar')", context);
       } catch(ParseException e) {
         UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.ERROR);
         throw e;
@@ -215,23 +414,23 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
-  public void testIndexingPut() throws InterruptedException {
-    String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", new HashMap<>(), context);
+  public void testPutIndexing() throws InterruptedException {
+    String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", context);
     run("CONFIG_PUT('INDEXING', config, 'testIndexingPut')", ImmutableMap.of("config", brop), context);
     boolean foundMatch = false;
     for(int i = 0;i < 10 && !foundMatch;++i) {
-      String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", new HashMap<>(), context);
+      String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", context);
       foundMatch =  brop.equals(bropNew);
       if(foundMatch) {
         break;
       }
       Thread.sleep(2000);
     }
-    Assert.assertTrue(foundMatch);
+    assertTrue(foundMatch);
   }
 
   @Test(expected= ParseException.class)
-  public void testIndexingPutBad() throws InterruptedException {
+  public void testPutIndexingBad() throws InterruptedException {
     {
       {
         UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
@@ -246,23 +445,26 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
-  public void testEnrichmentPut() throws InterruptedException {
-    String brop= (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut')", new HashMap<>(), context);
-    run("CONFIG_PUT('ENRICHMENT', config, 'testEnrichmentPut')", ImmutableMap.of("config", brop), context);
+  public void testPutEnrichment() throws InterruptedException {
+    String config = (String) run("CONFIG_GET('ENRICHMENT', 'sensor')", context);
+    assertNotNull(config);
+
+    run("CONFIG_PUT('ENRICHMENT', config, 'sensor')", ImmutableMap.of("config", config), context);
+
     boolean foundMatch = false;
     for(int i = 0;i < 10 && !foundMatch;++i) {
-      String bropNew = (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut', false)", new HashMap<>(), context);
-      foundMatch =  brop.equals(bropNew);
+      String newConfig = (String) run("CONFIG_GET('ENRICHMENT', 'sensor', false)", context);
+      foundMatch = config.equals(newConfig);
       if(foundMatch) {
         break;
       }
       Thread.sleep(2000);
     }
-    Assert.assertTrue(foundMatch);
+    assertTrue(foundMatch);
   }
 
   @Test(expected= ParseException.class)
-  public void testEnrichmentPutBad() throws InterruptedException {
+  public void testPutEnrichmentBad() throws InterruptedException {
     {
       {
         UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
@@ -277,23 +479,23 @@ public class ConfigurationFunctionsTest {
   }
 
   @Test
-  public void testParserPut() throws InterruptedException {
-    String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", new HashMap<>(), context);
+  public void testPutParser() throws InterruptedException {
+    String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", context);
     run("CONFIG_PUT('PARSER', config, 'testParserPut')", ImmutableMap.of("config", brop), context);
     boolean foundMatch = false;
     for(int i = 0;i < 10 && !foundMatch;++i) {
-      String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", new HashMap<>(), context);
+      String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", context);
       foundMatch =  brop.equals(bropNew);
       if(foundMatch) {
         break;
       }
       Thread.sleep(2000);
     }
-    Assert.assertTrue(foundMatch);
+    assertTrue(foundMatch);
   }
 
   @Test(expected= ParseException.class)
-  public void testParserPutBad() throws InterruptedException {
+  public void testPutParserBad() throws InterruptedException {
     {
       UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL);
       try {

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
index 781a0cf..352ae2b 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java
@@ -52,7 +52,6 @@ import java.io.ByteArrayInputStream;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -342,15 +341,18 @@ public class DefaultStellarShellExecutor implements StellarShellExecutor {
    * @param zkClient An optional Zookeeper client.
    */
   private Context createContext(Properties properties, Optional<CuratorFramework> zkClient) throws Exception {
+
     Context.Builder contextBuilder = new Context.Builder();
     Map<String, Object> globals;
     if (zkClient.isPresent()) {
+      LOG.debug("Zookeeper client present; fetching globals from Zookeeper.");
 
       // fetch globals from zookeeper
       globals = fetchGlobalConfig(zkClient.get());
       contextBuilder.with(ZOOKEEPER_CLIENT, () -> zkClient.get());
 
     } else {
+      LOG.debug("No Zookeeper client; initializing empty globals.");
 
       // use empty globals to allow a user to '%define' their own
       globals = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
index 5912657..d5f267e 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java
@@ -18,17 +18,18 @@
 
 package org.apache.metron.stellar.common.utils;
 
+import com.google.common.collect.ImmutableList;
+import org.apache.metron.stellar.common.StellarPredicateProcessor;
+import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.DefaultVariableResolver;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.metron.stellar.dsl.VariableResolver;
-import com.google.common.collect.ImmutableList;
-import org.apache.metron.stellar.common.StellarPredicateProcessor;
-import org.apache.metron.stellar.common.StellarProcessor;
 import org.junit.Assert;
 
 import java.util.AbstractMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Spliterators;
@@ -39,39 +40,76 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+/**
+ * Utilities for executing and validating Stellar expressions.
+ */
 public class StellarProcessorUtils {
 
-    /**
-     * This utility class is intended for use while unit testing Stellar operators.
-     * It is included in the "main" code so third-party operators will not need
-     * a test dependency on Stellar's test-jar.
-     *
-     * This class ensures the basic contract of a stellar expression is adhered to:
-     * 1. Validate works on the expression
-     * 2. The output can be serialized and deserialized properly
-     *
-     * @param rule
-     * @param variables
-     * @param context
-     * @return ret
-     */
-    public static Object run(String rule, Map<String, Object> variables, Context context) {
-        StellarProcessor processor = new StellarProcessor();
-        Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
-        Object ret = processor.parse(rule, new DefaultVariableResolver(x -> variables.get(x),x-> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context);
-        byte[] raw = SerDeUtils.toBytes(ret);
-        Object actual = SerDeUtils.fromBytes(raw, Object.class);
-        Assert.assertEquals(ret, actual);
-        return ret;
-    }
+  /**
+   * Execute and validate a Stellar expression.
+   *
+   * <p>This is intended for use while unit testing Stellar expressions.  This ensures that the expression
+   * validates successfully and produces a result that can be serialized correctly.
+   *
+   * @param expression The expression to execute.
+   * @param variables The variables to expose to the expression.
+   * @param context The execution context.
+   * @return The result of executing the expression.
+   */
+  public static Object run(String expression, Map<String, Object> variables, Context context) {
+
+    // validate the expression
+    StellarProcessor processor = new StellarProcessor();
+    Assert.assertTrue("Invalid expression; expr=" + expression,
+            processor.validate(expression, context));
+
+    // execute the expression
+    Object ret = processor.parse(
+            expression,
+            new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)),
+            StellarFunctions.FUNCTION_RESOLVER(),
+            context);
+
+    // ensure the result can be serialized/deserialized
+    byte[] raw = SerDeUtils.toBytes(ret);
+    Object actual = SerDeUtils.fromBytes(raw, Object.class);
+    Assert.assertEquals(ret, actual);
+
+    return ret;
+  }
+
+  /**
+   * Execute and validate a Stellar expression.
+   *
+   * <p>This is intended for use while unit testing Stellar expressions.  This ensures that the expression
+   * validates successfully and produces a result that can be serialized correctly.
+   *
+   * @param expression The expression to execute.
+   * @param variables The variables to expose to the expression.
+   * @return The result of executing the expression.
+   */
+  public static Object run(String expression, Map<String, Object> variables) {
+    return run(expression, variables, Context.EMPTY_CONTEXT());
+  }
 
-  public static Object run(String rule, Map<String, Object> variables) {
-    return run(rule, variables, Context.EMPTY_CONTEXT());
+  /**
+   * Execute and validate a Stellar expression.
+   *
+   * <p>This is intended for use while unit testing Stellar expressions.  This ensures that the expression
+   * validates successfully and produces a result that can be serialized correctly.
+   *
+   * @param expression The expression to execute.
+   * @param context The execution context.
+   * @return The result of executing the expression.
+   */
+  public static Object run(String expression, Context context) {
+    return run(expression, Collections.emptyMap(), context);
   }
 
-  public static void validate(String rule, Context context) {
+  public static void validate(String expression, Context context) {
     StellarProcessor processor = new StellarProcessor();
-    Assert.assertTrue(rule + " not valid.", processor.validate(rule, context));
+    Assert.assertTrue("Invalid expression; expr=" + expression,
+            processor.validate(expression, context));
   }
 
   public static void validate(String rule) {
@@ -101,19 +139,18 @@ public class StellarProcessorUtils {
   }
 
   public static void runWithArguments(String function, List<Object> arguments, Object expected) {
-    Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport.stream(new XRange(arguments.size()), false)
-            .map( i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i)));
+    Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport
+            .stream(new XRange(arguments.size()), false)
+            .map(i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i)));
 
-    String args = kvStream.get().map( kv -> kv.getKey())
-                                .collect(Collectors.joining(","));
+    String args = kvStream.get().map(kv -> kv.getKey()).collect(Collectors.joining(","));
     Map<String, Object> variables = kvStream.get().collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue()));
-    String stellarStatement =  function + "(" + args + ")";
+    String stellarStatement = function + "(" + args + ")";
     String reason = stellarStatement + " != " + expected + " with variables: " + variables;
 
-    if(expected instanceof Double) {
-      Assert.assertEquals(reason, (Double)expected, (Double)run(stellarStatement, variables), 1e-6);
-    }
-    else {
+    if (expected instanceof Double) {
+      Assert.assertEquals(reason, (Double) expected, (Double) run(stellarStatement, variables), 1e-6);
+    } else {
       Assert.assertEquals(reason, expected, run(stellarStatement, variables));
     }
   }
@@ -135,10 +172,9 @@ public class StellarProcessorUtils {
     @Override
     public boolean tryAdvance(IntConsumer action) {
       boolean isDone = i >= end;
-      if(isDone) {
+      if (isDone) {
         return false;
-      }
-      else {
+      } else {
         action.accept(i);
         i++;
         return true;
@@ -148,25 +184,20 @@ public class StellarProcessorUtils {
     /**
      * {@inheritDoc}
      *
-     * @param action
-     * to {@code IntConsumer} and passed to
-     * {@link #tryAdvance(IntConsumer)}; otherwise
-     * the action is adapted to an instance of {@code IntConsumer}, by
-     * boxing the argument of {@code IntConsumer}, and then passed to
-     * {@link #tryAdvance(IntConsumer)}.
+     * @param action to {@code IntConsumer} and passed to {@link #tryAdvance(IntConsumer)};
+     *     otherwise the action is adapted to an instance of {@code IntConsumer}, by boxing the
+     *     argument of {@code IntConsumer}, and then passed to {@link #tryAdvance(IntConsumer)}.
      */
     @Override
     public boolean tryAdvance(Consumer<? super Integer> action) {
       boolean isDone = i >= end;
-      if(isDone) {
+      if (isDone) {
         return false;
-      }
-      else {
+      } else {
         action.accept(i);
         i++;
         return true;
       }
     }
   }
-
 }


[03/11] metron git commit: METRON-1520: Add caching for stellar field transformations closes apache/incubator-metron#990

Posted by ot...@apache.org.
METRON-1520: Add caching for stellar field transformations closes apache/incubator-metron#990


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1c5435cc
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1c5435cc
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1c5435cc

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 1c5435ccbe96c03e59c6a18d681da43561769dba
Parents: 37e3fd3
Author: cstella <ce...@gmail.com>
Authored: Wed Apr 25 11:48:44 2018 -0400
Committer: cstella <ce...@gmail.com>
Committed: Wed Apr 25 11:48:44 2018 -0400

----------------------------------------------------------------------
 metron-platform/Performance-tuning-guide.md     |  13 ++
 .../configuration/SensorParserConfig.java       |  15 ++
 .../transformation/StellarTransformation.java   |   3 +-
 .../StellarTransformationTest.java              |  30 ++++
 metron-platform/metron-parsers/README.md        |  13 ++
 .../apache/metron/parsers/bolt/ParserBolt.java  |  15 +-
 .../stellar/common/CachingStellarProcessor.java | 144 +++++++++++++++++++
 .../org/apache/metron/stellar/dsl/Context.java  |  43 +++++-
 .../common/CachingStellarProcessorTest.java     | 104 ++++++++++++++
 9 files changed, 371 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/Performance-tuning-guide.md
----------------------------------------------------------------------
diff --git a/metron-platform/Performance-tuning-guide.md b/metron-platform/Performance-tuning-guide.md
index e2d1ae2..c2d19d6 100644
--- a/metron-platform/Performance-tuning-guide.md
+++ b/metron-platform/Performance-tuning-guide.md
@@ -60,6 +60,19 @@ parallelism will leave you with idle consumers since Kafka limits the max number
 important because Kafka has certain ordering guarantees for message delivery per partition that would not be possible if more than
 one consumer in a given consumer group were able to read from that partition.
 
+## Sensor Topology Tuning Suggestions
+
+If you are using stellar field transformations in your sensors, by default, stellar expressions
+are not cached.  Sensors that use stellar field transformations by see a performance
+boost by turning on caching via setting the `cacheConfig`
+[property](metron-parsers#parser_configuration).
+This is beneficial if your transformations:
+
+* Are complex (e.g. `ENRICHMENT_GET` calls or other high latency calls)
+* All Yield the same results for the same inputs ( caching is either off or applied to all transformations)
+  * If any of your transformations are non-deterministic, caching should not be used as it will result in the likelihood of incorrect results being returned.
+
+
 ## Component Tuning Levers
 
 ### High Level Overview

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
index 2d0ccd8..d347481 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/SensorParserConfig.java
@@ -45,11 +45,26 @@ public class SensorParserConfig implements Serializable {
   private Integer parserNumTasks = 1;
   private Integer errorWriterParallelism = 1;
   private Integer errorWriterNumTasks = 1;
+  private Map<String, Object> cacheConfig = new HashMap<>();
   private Map<String, Object> spoutConfig = new HashMap<>();
   private String securityProtocol = null;
   private Map<String, Object> stormConfig = new HashMap<>();
 
   /**
+   * Cache config for stellar field transformations.
+   * * stellar.cache.maxSize - The maximum number of elements in the cache.
+   * * stellar.cache.maxTimeRetain - The maximum amount of time an element is kept in the cache (in minutes).
+   * @return
+   */
+  public Map<String, Object> getCacheConfig() {
+    return cacheConfig;
+  }
+
+  public void setCacheConfig(Map<String, Object> cacheConfig) {
+    this.cacheConfig = cacheConfig;
+  }
+
+  /**
    * Return the number of workers for the topology.  This property will be used for the parser unless overridden on the CLI.
    * @return
    */

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
index 2a22e21..bb7501d 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/StellarTransformation.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.common.field.transformation;
 
+import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
 import org.apache.metron.stellar.dsl.StellarFunctions;
@@ -40,7 +41,7 @@ public class StellarTransformation implements FieldTransformation {
     Set<String> outputs = new HashSet<>(outputField);
     MapVariableResolver resolver = new MapVariableResolver(ret, intermediateVariables, input);
     resolver.add(sensorConfig);
-    StellarProcessor processor = new StellarProcessor();
+    StellarProcessor processor = new CachingStellarProcessor();
     for(Map.Entry<String, Object> kv : fieldMappingConfig.entrySet()) {
       String oField = kv.getKey();
       Object transformObj = kv.getValue();

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
index 0a3cbb0..fc91844 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/StellarTransformationTest.java
@@ -18,19 +18,49 @@
 
 package org.apache.metron.common.field.transformation;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.FieldTransformer;
 import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 
+@RunWith(Parameterized.class)
 public class StellarTransformationTest {
+  Context context;
+  public StellarTransformationTest(Cache<CachingStellarProcessor.Key, Object> cache) {
+    if(cache == null) {
+      context = Context.EMPTY_CONTEXT();
+    }
+    else {
+      context = new Context.Builder().with(Context.Capabilities.CACHE, () -> cache).build();
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+            new Object[][] {
+                     { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 10)) }
+                   , { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 1)) }
+                   , { CachingStellarProcessor.createCache(ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 0)) }
+                   , { null }
+                           }
+                        );
+  }
+
   /**
    {
     "fieldTransformations" : [

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 6b9d62e..1d2d834 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -174,6 +174,19 @@ then it is assumed to be a regex and will match any topic matching the pattern (
 * `spoutConfig` : A map representing a custom spout config (this is a map). This can be overridden on the command line.
 * `securityProtocol` : The security protocol to use for reading from kafka (this is a string).  This can be overridden on the command line and also specified in the spout config via the `security.protocol` key.  If both are specified, then they are merged and the CLI will take precedence.
 * `stormConfig` : The storm config to use (this is a map).  This can be overridden on the command line.  If both are specified, they are merged with CLI properties taking precedence.
+* `cacheConfig` : Cache config for stellar field transformations.   This configures a least frequently used cache.  This is a map with the following keys.  If not explicitly configured (the default), then no cache will be used.
+  * `stellar.cache.maxSize` - The maximum number of elements in the cache. Default is to not use a cache.
+  * `stellar.cache.maxTimeRetain` - The maximum amount of time an element is kept in the cache (in minutes). Default is to not use a cache.
+
+  Example of a cache config to contain at max `20000` stellar expressions for at most `20` minutes.:
+```
+{
+  "cacheConfig" : {
+    "stellar.cache.maxSize" : 20000,
+    "stellar.cache.maxTimeRetain" : 20
+  }
+}
+```
 
 The `fieldTransformations` is a complex object which defines a
 transformation which can be done to a message.  This transformation can 

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index e996f14..dd59355 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -31,6 +31,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+
+import com.github.benmanes.caffeine.cache.Cache;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
@@ -45,6 +47,7 @@ import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.stellar.common.CachingStellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunctions;
 import org.apache.storm.task.OutputCollector;
@@ -67,6 +70,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   private WriterHandler writer;
   private Context stellarContext;
   private transient MessageGetStrategy messageGetStrategy;
+  private transient Cache<CachingStellarProcessor.Key, Object> cache;
   public ParserBolt( String zookeeperUrl
                    , String sensorType
                    , MessageParser<JSONObject> parser
@@ -94,6 +98,9 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     super.prepare(stormConf, context, collector);
     messageGetStrategy = MessageGetters.DEFAULT_BYTES_FROM_POSITION.get();
     this.collector = collector;
+    if(getSensorParserConfig() != null) {
+      cache = CachingStellarProcessor.createCache(getSensorParserConfig().getCacheConfig());
+    }
     initializeStellar();
     if(getSensorParserConfig() != null && filter == null) {
       getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
@@ -119,11 +126,15 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   }
 
   protected void initializeStellar() {
-    this.stellarContext = new Context.Builder()
+    Context.Builder builder = new Context.Builder()
                                 .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client)
                                 .with(Context.Capabilities.GLOBAL_CONFIG, () -> getConfigurations().getGlobalConfig())
                                 .with(Context.Capabilities.STELLAR_CONFIG, () -> getConfigurations().getGlobalConfig())
-                                .build();
+                                ;
+    if(cache != null) {
+      builder = builder.with(Context.Capabilities.CACHE, () -> cache);
+    }
+    this.stellarContext = builder.build();
     StellarFunctions.initialize(stellarContext);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
new file mode 100644
index 0000000..36e6579
--- /dev/null
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.VariableResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The Caching Stellar Processor is a stellar processor that optionally fronts stellar with an expression-by-expression
+ * LFU cache.
+ */
+public class CachingStellarProcessor extends StellarProcessor {
+  private static ThreadLocal<Map<String, Set<String>> > variableCache = ThreadLocal.withInitial(() -> new HashMap<>());
+  public static String MAX_CACHE_SIZE_PARAM = "stellar.cache.maxSize";
+  public static String MAX_TIME_RETAIN_PARAM = "stellar.cache.maxTimeRetain";
+
+  public static class Key {
+    private String expression;
+    private Map<String, Object> input;
+
+    public Key(String expression, Map<String, Object> input) {
+      this.expression = expression;
+      this.input = input;
+    }
+
+    public String getExpression() {
+      return expression;
+    }
+
+    public Map<String, Object> getInput() {
+      return input;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Key key = (Key) o;
+
+      if (getExpression() != null ? !getExpression().equals(key.getExpression()) : key.getExpression() != null)
+        return false;
+      return getInput() != null ? getInput().equals(key.getInput()) : key.getInput() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = getExpression() != null ? getExpression().hashCode() : 0;
+      result = 31 * result + (getInput() != null ? getInput().hashCode() : 0);
+      return result;
+    }
+  }
+
+
+  /**
+   * Parses and evaluates the given Stellar expression, {@code expression}.  Results will be taken from a cache if possible.
+   *
+   * @param expression             The Stellar expression to parse and evaluate.
+   * @param variableResolver The {@link VariableResolver} to determine values of variables used in the Stellar expression, {@code expression}.
+   * @param functionResolver The {@link FunctionResolver} to determine values of functions used in the Stellar expression, {@code expression}.
+   * @param context          The context used during validation.
+   * @return The value of the evaluated Stellar expression, {@code expression}.
+   */
+  @Override
+  public Object parse(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) {
+    Optional<Object> cacheOpt = context.getCapability(Context.Capabilities.CACHE, false);
+    if(cacheOpt.isPresent()) {
+      Cache<Key, Object> cache = (Cache<Key, Object>) cacheOpt.get();
+      Key k = toKey(expression, variableResolver);
+      return cache.get(k, x -> parseUncached(x.expression, variableResolver, functionResolver, context));
+    }
+    else {
+      return parseUncached(expression, variableResolver, functionResolver, context);
+    }
+  }
+
+  protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) {
+    return super.parse(expression, variableResolver, functionResolver, context);
+  }
+
+  private Key toKey(String expression, VariableResolver resolver) {
+    Set<String> variablesUsed = variableCache.get().computeIfAbsent(expression, this::variablesUsed);
+    Map<String, Object> input = new HashMap<>();
+    for(String v : variablesUsed) {
+      input.computeIfAbsent(v, resolver::resolve);
+    }
+    return new Key(expression, input);
+  }
+
+  /**
+   * Create a cache given a config.  Note that if the cache size is <= 0, then no cache will be returned.
+   * @param config
+   * @return A cache.
+   */
+  public static Cache<Key, Object> createCache(Map<String, Object> config) {
+    if(config == null) {
+      return null;
+    }
+    Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, null, Long.class);
+    Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, null, Integer.class);
+    if(maxSize == null || maxTimeRetain == null || maxSize <= 0 || maxTimeRetain <= 0) {
+      return null;
+    }
+    return Caffeine.newBuilder()
+                   .maximumSize(maxSize)
+                   .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES)
+                   .build();
+  }
+
+  private static <T> T getParam(Map<String, Object> config, String key, T defaultVal, Class<T> clazz) {
+    Object o = config.get(key);
+    if(o == null) {
+      return defaultVal;
+    }
+    T ret = ConversionUtils.convert(o, clazz);
+    return ret == null?defaultVal:ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
index 9568a05..8a477c4 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/Context.java
@@ -30,12 +30,43 @@ public class Context implements Serializable {
   
   public enum Capabilities {
       HBASE_PROVIDER
-    , GLOBAL_CONFIG
-    , ZOOKEEPER_CLIENT
-    , SERVICE_DISCOVERER
-    , STELLAR_CONFIG
-    , CONSOLE
-    , SHELL_VARIABLES
+    ,
+    /**
+     * This capability indicates that the global config is available.
+     */
+    GLOBAL_CONFIG
+    ,
+    /**
+     * This capability indicates that a zookeeper client (i.e. a Curator client, specifically) is available.
+     */
+    ZOOKEEPER_CLIENT
+    ,
+    /**
+     * This capability indicates that a MaaS service discoverer is available.
+     */
+    SERVICE_DISCOVERER
+    ,
+    /**
+     * This capability indicates that a map configuring stellar is available.  Generally this is done within the global config
+     * inside of storm, but may be sourced elsewhere (e.g. the CLI when running the REPL).
+     */
+    STELLAR_CONFIG
+    ,
+    /**
+     * This capability indicates that the Console object is available.  This is available when run via the CLI (e.g. from the REPL).
+     */
+    CONSOLE
+    ,
+    /**
+     * This capability indicates that shell variables are available.  This is available when run via the CLI (e.g. from the REPL).
+     */
+    SHELL_VARIABLES
+    ,
+    /**
+     * This capability indicates that the StellarProcessor should use a Caffeine cache to cache expression -> results.  If an expression
+     * is in the cache, then the cached result will be returned instead of recomputing.
+     */
+    CACHE
   }
 
   public enum ActivityType {

http://git-wip-us.apache.org/repos/asf/metron/blob/1c5435cc/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
new file mode 100644
index 0000000..94421de
--- /dev/null
+++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.stellar.common;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.google.common.collect.ImmutableMap;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.MapVariableResolver;
+import org.apache.metron.stellar.dsl.StellarFunctions;
+import org.apache.metron.stellar.dsl.VariableResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CachingStellarProcessorTest {
+
+  private static Map<String, Object> fields = new HashMap<String, Object>() {{
+      put("name", "blah");
+    }};
+
+  @Test
+  public void testNoCaching() throws Exception {
+    //no caching, so every expression is a cache miss.
+    Assert.assertEquals(2, countMisses(2, Context.EMPTY_CONTEXT(), "TO_UPPER(name)"));
+    //Ensure the correct result is returned.
+    Assert.assertEquals("BLAH", evaluateExpression(Context.EMPTY_CONTEXT(), "TO_UPPER(name)"));
+  }
+
+  @Test
+  public void testCaching() throws Exception {
+    Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache(
+                                                 ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 2
+                                                                ,CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10
+                                                                )
+                                                                           );
+    Context context = new Context.Builder()
+                                 .with( Context.Capabilities.CACHE , () -> cache )
+                                 .build();
+    //running the same expression twice should hit the cache on the 2nd time and only yield one miss
+    Assert.assertEquals(1, countMisses(2, context, "TO_UPPER(name)"));
+
+    //Ensure the correct result is returned.
+    Assert.assertEquals("BLAH", evaluateExpression(context, "TO_UPPER(name)"));
+
+    //running the same expression 20 more times should pull from the cache
+    Assert.assertEquals(0, countMisses(20, context, "TO_UPPER(name)"));
+
+    //Now we are running 4 distinct operations with a cache size of 2.  The cache has 1 element in it before we start:
+    //  TO_LOWER(name) - miss (brand new), cache is full
+    //  TO_UPPER(name) - hit, cache is full
+    //  TO_UPPER('foo') - miss (brand new), cache is still full, but TO_LOWER is evicted as the least frequently used
+    //  JOIN... - miss (brand new), cache is still full, but TO_UPPER('foo') is evicted as the least frequently used
+    //this pattern repeats a 2nd time to add another 3 cache misses, totalling 6.
+    Assert.assertEquals(6, countMisses(2, context, "TO_LOWER(name)", "TO_UPPER(name)", "TO_UPPER('foo')", "JOIN([name, 'blah'], ',')"));
+  }
+
+  private Object evaluateExpression(Context context, String expression) {
+    StellarProcessor processor = new CachingStellarProcessor();
+    return processor.parse(expression
+                , new MapVariableResolver(fields)
+                , StellarFunctions.FUNCTION_RESOLVER()
+                , context);
+  }
+
+  private int countMisses(int numRepetition, Context context, String... expressions) {
+    AtomicInteger numExpressions = new AtomicInteger(0);
+    StellarProcessor processor = new CachingStellarProcessor() {
+      @Override
+      protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) {
+        numExpressions.incrementAndGet();
+        return super.parseUncached(expression, variableResolver, functionResolver, context);
+      }
+    };
+
+    for(int i = 0;i < numRepetition;++i) {
+      for(String expression : expressions) {
+        processor.parse(expression
+                , new MapVariableResolver(fields)
+                , StellarFunctions.FUNCTION_RESOLVER()
+                , context);
+      }
+    }
+    return numExpressions.get();
+  }
+}


[04/11] metron git commit: METRON-1539: Specialized RENAME field transformer closes apache/incubator-metron#1002

Posted by ot...@apache.org.
METRON-1539: Specialized RENAME field transformer closes apache/incubator-metron#1002


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/2b4f0b84
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/2b4f0b84
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/2b4f0b84

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 2b4f0b84062d65f9400421d66ec3b7d6d093bebf
Parents: 1c5435c
Author: cstella <ce...@gmail.com>
Authored: Wed Apr 25 11:49:56 2018 -0400
Committer: cstella <ce...@gmail.com>
Committed: Wed Apr 25 11:49:56 2018 -0400

----------------------------------------------------------------------
 .../common/configuration/FieldTransformer.java  |  4 +-
 .../transformation/FieldTransformations.java    |  1 +
 .../transformation/RenameTransformation.java    | 55 +++++++++++
 .../transformation/FieldTransformationTest.java | 17 +---
 .../RenameTransformationTest.java               | 99 ++++++++++++++++++++
 metron-platform/metron-parsers/README.md        | 25 ++++-
 6 files changed, 183 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/2b4f0b84/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
index df80691..43ce9d8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/FieldTransformer.java
@@ -94,7 +94,9 @@ public class FieldTransformer implements Serializable {
 
       if (output == null || output.isEmpty()) {
         if (input == null || input.isEmpty()) {
-          throw new IllegalStateException("You must specify an input field if you want to leave the output fields empty");
+          //both are empty, so let's set them both to null
+          output = null;
+          input = null;
         } else {
           output = input;
         }

http://git-wip-us.apache.org/repos/asf/metron/blob/2b4f0b84/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
index a905123..95ff390 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/FieldTransformations.java
@@ -25,6 +25,7 @@ public enum FieldTransformations {
   ,REMOVE(new RemoveTransformation())
   ,STELLAR(new StellarTransformation())
   ,SELECT(new SelectTransformation())
+  ,RENAME(new RenameTransformation())
   ;
   FieldTransformation mapping;
   FieldTransformations(FieldTransformation mapping) {

http://git-wip-us.apache.org/repos/asf/metron/blob/2b4f0b84/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RenameTransformation.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RenameTransformation.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RenameTransformation.java
new file mode 100644
index 0000000..f8b9374
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/transformation/RenameTransformation.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.field.transformation;
+
+import org.apache.metron.stellar.dsl.Context;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RenameTransformation implements FieldTransformation{
+  @Override
+  public Map<String, Object> map( Map<String, Object> input
+                                , List<String> outputField
+                                , LinkedHashMap<String, Object> fieldMappingConfig
+                                , Context context
+                                , Map<String, Object>... sensorConfig
+                                )
+  {
+    if(fieldMappingConfig == null || fieldMappingConfig.isEmpty()) {
+      return input;
+    }
+    Map<String, Object> ret = new HashMap<>();
+    for(Map.Entry<String, Object> kv : input.entrySet()) {
+      Object renamed = fieldMappingConfig.get(kv.getKey());
+      if(renamed != null) {
+        //if we're renaming, then we want to copy the field to the new name
+        ret.put(renamed.toString(), kv.getValue());
+        //and remove the old field
+        ret.put(kv.getKey(), null);
+      }
+      else {
+        ret.put(kv.getKey(), kv.getValue());
+      }
+    }
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/2b4f0b84/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
index 71a0298..b7557e8 100644
--- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/FieldTransformationTest.java
@@ -85,18 +85,6 @@ public class FieldTransformationTest {
    {
     "fieldTransformations" : [
           {
-           "transformation" : "IP_PROTOCOL"
-          }
-                      ]
-   }
-   */
-  @Multiline
-  public static String badConfigMissingInput;
-
-  /**
-   {
-    "fieldTransformations" : [
-          {
             "input" : "protocol"
           }
                       ]
@@ -113,10 +101,7 @@ public class FieldTransformationTest {
     Assert.assertEquals(ImmutableList.of("protocol"), c.getFieldTransformations().get(0).getInput());
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void testInValidSerde_missingInput() throws IOException {
-    SensorParserConfig.fromBytes(Bytes.toBytes(badConfigMissingInput));
-  }
+
 
   @Test(expected = IllegalStateException.class)
   public void testInValidSerde_missingMapping() throws IOException {

http://git-wip-us.apache.org/repos/asf/metron/blob/2b4f0b84/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RenameTransformationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RenameTransformationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RenameTransformationTest.java
new file mode 100644
index 0000000..cacc818
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/transformation/RenameTransformationTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.common.field.transformation;
+
+import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.configuration.FieldTransformer;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class RenameTransformationTest {
+  /**
+   {
+    "fieldTransformations" : [
+          {
+            "transformation" : "RENAME",
+            "config" : {
+              "old_field1" : "new_field1",
+              "old_field2" : "new_field2"
+                      }
+          }
+                             ]
+   }
+   */
+  @Multiline
+  public static String smoketestConfig;
+
+  @Test
+  public void smokeTest() throws Exception {
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(smoketestConfig));
+    FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null);
+    JSONObject input = new JSONObject(new HashMap<String, Object>() {{
+      for(int i = 1;i <= 10;++i) {
+        put("old_field" + i, "f" + i);
+      }
+    }});
+    handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
+    Assert.assertEquals("f1", input.get("new_field1"));
+    Assert.assertEquals("f2", input.get("new_field2"));
+    for(int i = 3;i <= 10;++i) {
+      Assert.assertEquals("f" + i, input.get("old_field" + i));
+    }
+    Assert.assertFalse(input.containsKey("old_field1"));
+    Assert.assertFalse(input.containsKey("old_field2"));
+    Assert.assertEquals(10, input.size());
+  }
+
+  /**
+   {
+    "fieldTransformations" : [
+          {
+            "transformation" : "RENAME",
+            "config" : {
+              "old_field1" : "new_field1"
+                      }
+          }
+                             ]
+   }
+   */
+  @Multiline
+  public static String renameMissingField;
+  @Test
+  public void renameMissingField() throws Exception {
+    SensorParserConfig c = SensorParserConfig.fromBytes(Bytes.toBytes(renameMissingField));
+    FieldTransformer handler = Iterables.getFirst(c.getFieldTransformations(), null);
+    JSONObject input = new JSONObject(new HashMap<String, Object>() {{
+      for(int i = 2;i <= 10;++i) {
+        put("old_field" + i, "f" + i);
+      }
+    }});
+    handler.transformAndUpdate(input, Context.EMPTY_CONTEXT());
+    Assert.assertFalse(input.containsKey("new_field1"));
+    for(int i = 2;i <= 10;++i) {
+      Assert.assertEquals("f" + i, input.get("old_field" + i));
+    }
+    Assert.assertEquals(9, input.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/2b4f0b84/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 1d2d834..e8b2896 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -313,10 +313,33 @@ into `{ "protocol" : "TCP", "source.type" : "bro", ...}`
 * `STELLAR` : This transformation executes a set of transformations
   expressed as [Stellar Language](../metron-common) statements.
 
+* `RENAME` : This transformation allows users to rename a set of fields.  Specifically,
+the config is presumed to be the mapping.  The keys to the config are the existing field names
+and the values for the config map are the associated new field name.
+
+The following config will rename the fields `old_field` and `different_old_field` to
+`new_field` and `different_new_field` respectively:
+```
+{
+...
+    "fieldTransformations" : [
+          {
+            "transformation" : "RENAME",
+          , "config" : {
+            "old_field" : "new_field",
+            "different_old_field" : "different_new_field"
+                       }
+          }
+                      ]
+}
+```
+
+
 ### Assignment to `null`
 
 If, in your field transformation, you assign a field to `null`, the field will be removed.
-You can use this capability to rename variables.
+You can use this capability to rename variables.  It is preferred, however, that the `RENAME`
+field transformation is used in this situation as it is less awkward.
 
 Consider this example:
 ```


[07/11] metron git commit: METRON-1530 Default proxy config settings in metron-contrib need to be updated (sardell via merrimanr) closes apache/metron#998

Posted by ot...@apache.org.
METRON-1530 Default proxy config settings in metron-contrib need to be updated (sardell via merrimanr) closes apache/metron#998


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/1b1a45b7
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/1b1a45b7
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/1b1a45b7

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: 1b1a45b7e9c2b8c451f970028b4fe310ef9b7894
Parents: ac05638
Author: sardell <sa...@hortonworks.com>
Authored: Wed May 9 09:43:58 2018 -0500
Committer: merrimanr <me...@apache.org>
Committed: Wed May 9 09:43:58 2018 -0500

----------------------------------------------------------------------
 metron-interface/metron-alerts/proxy.conf.json | 9 ---------
 metron-interface/metron-config/proxy.conf.json | 4 ++--
 2 files changed, 2 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/1b1a45b7/metron-interface/metron-alerts/proxy.conf.json
----------------------------------------------------------------------
diff --git a/metron-interface/metron-alerts/proxy.conf.json b/metron-interface/metron-alerts/proxy.conf.json
index 27d1ee9..612bd67 100644
--- a/metron-interface/metron-alerts/proxy.conf.json
+++ b/metron-interface/metron-alerts/proxy.conf.json
@@ -6,14 +6,5 @@
   "/logout": {
     "target": "http://node1:8082",
     "secure": false
-  },
-  "/search": {
-    "target": "http://node1:9200",
-    "pathRewrite": {"^/search" : ""},
-    "secure": false
-  },
-  "/_cluster": {
-    "target": "http://node1:9200",
-    "secure": false
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/1b1a45b7/metron-interface/metron-config/proxy.conf.json
----------------------------------------------------------------------
diff --git a/metron-interface/metron-config/proxy.conf.json b/metron-interface/metron-config/proxy.conf.json
index 29466cc..612bd67 100644
--- a/metron-interface/metron-config/proxy.conf.json
+++ b/metron-interface/metron-config/proxy.conf.json
@@ -1,10 +1,10 @@
 {
   "/api/v1": {
-    "target": "http://localhost:8080",
+    "target": "http://node1:8082",
     "secure": false
   },
   "/logout": {
-    "target": "http://localhost:8080",
+    "target": "http://node1:8082",
     "secure": false
   }
 }


[11/11] metron git commit: METRON-1549: Add empty object test to WriterBoltIntegrationTest implementation (mmiklavc via mmiklavc) closes apache/metron#1009

Posted by ot...@apache.org.
METRON-1549: Add empty object test to WriterBoltIntegrationTest implementation (mmiklavc via mmiklavc) closes apache/metron#1009


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/b9453aab
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/b9453aab
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/b9453aab

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: b9453aabd781c7c67258d9506af176fbcab85be1
Parents: a17c1ad
Author: mmiklavc <mi...@gmail.com>
Authored: Fri May 11 12:04:01 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Fri May 11 12:04:01 2018 -0600

----------------------------------------------------------------------
 .../integration/WriterBoltIntegrationTest.java  | 315 ++++++++++++++-----
 1 file changed, 231 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/b9453aab/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
index cde08bc..d565147 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/integration/WriterBoltIntegrationTest.java
@@ -17,40 +17,56 @@
  */
 package org.apache.metron.writers.integration;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.SensorParserConfig;
-import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.common.field.validation.FieldValidation;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.enrichment.integration.components.ConfigUploadComponent;
-import org.apache.metron.integration.*;
+import org.apache.metron.integration.BaseIntegrationTest;
+import org.apache.metron.integration.ComponentRunner;
+import org.apache.metron.integration.Processor;
+import org.apache.metron.integration.ProcessorResult;
+import org.apache.metron.integration.ReadinessState;
 import org.apache.metron.integration.components.KafkaComponent;
-import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.processors.KafkaProcessor;
-import org.apache.metron.parsers.csv.CSVParser;
 import org.apache.metron.parsers.integration.components.ParserTopologyComponent;
-import org.apache.metron.test.utils.UnitTestHelper;
+import org.apache.metron.parsers.interfaces.MessageParser;
+import org.apache.metron.stellar.dsl.Context;
 import org.json.simple.JSONObject;
-import org.json.simple.parser.ParseException;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-
 public class WriterBoltIntegrationTest extends BaseIntegrationTest {
+  private ZKServerComponent zkServerComponent;
+  private KafkaComponent kafkaComponent;
+  private ConfigUploadComponent configUploadComponent;
+  private ParserTopologyComponent parserTopologyComponent;
 
-  public static class MockValidator implements FieldValidation{
+  public static class MockValidator implements FieldValidation {
 
     @Override
     public boolean isValid(Map<String, Object> input, Map<String, Object> validationConfig, Map<String, Object> globalConfig, Context context) {
-      if(input.get("action").equals("invalid")) {
+      if (input.get("action").equals("invalid")) {
         return false;
       }
       return true;
@@ -60,6 +76,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     public void initialize(Map<String, Object> validationConfig, Map<String, Object> globalConfig) {
     }
   }
+
   /**
    * {
    *   "fieldValidations" : [
@@ -68,7 +85,7 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
    * }
    */
   @Multiline
-  public static String globalConfig;
+  public static String globalConfigWithValidation;
 
   /**
    * {
@@ -88,57 +105,23 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
   public static String parserConfigJSON;
 
   @Test
-  public void test() throws UnableToStartException, IOException, ParseException {
-
-    UnitTestHelper.setLog4jLevel(CSVParser.class, org.apache.log4j.Level.FATAL);
+  public void parser_with_global_validations_writes_bad_records_to_error_topic() throws Exception {
     final String sensorType = "dummy";
-
     SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(parserConfigJSON, SensorParserConfig.class);
-
-    // the input messages to parser
     final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
       add(Bytes.toBytes("valid,foo"));
       add(Bytes.toBytes("invalid,foo"));
       add(Bytes.toBytes("error"));
     }};
 
-    // setup external components; zookeeper, kafka
     final Properties topologyProperties = new Properties();
-    final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
-    final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
-      add(new KafkaComponent.Topic(sensorType, 1));
-      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
-      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
-    }});
-    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
-
-    ConfigUploadComponent configUploadComponent = new ConfigUploadComponent()
-            .withTopologyProperties(topologyProperties)
-            .withGlobalConfig(globalConfig)
-            .withParserSensorConfig(sensorType, parserConfig);
-
-    ParserTopologyComponent parserTopologyComponent = new ParserTopologyComponent.Builder()
-            .withSensorType(sensorType)
-            .withTopologyProperties(topologyProperties)
-            .withBrokerUrl(kafkaComponent.getBrokerList())
-            .withErrorTopic(parserConfig.getErrorTopic())
-            .withOutputTopic(parserConfig.getOutputTopic())
-            .build();
-
-    ComponentRunner runner = new ComponentRunner.Builder()
-            .withComponent("zk", zkServerComponent)
-            .withComponent("kafka", kafkaComponent)
-            .withComponent("config", configUploadComponent)
-            .withComponent("org/apache/storm", parserTopologyComponent)
-            .withMillisecondsBetweenAttempts(5000)
-            .withNumRetries(10)
-            .withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"})
-            .build();
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigWithValidation);
     try {
       runner.start();
       kafkaComponent.writeMessages(sensorType, inputMessages);
-      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(
-              getProcessor(parserConfig.getOutputTopic(), parserConfig.getErrorTopic()));
+      KafkaProcessor<Map<String, List<JSONObject>>> kafkaProcessor = getKafkaProcessor(
+          parserConfig.getOutputTopic(), parserConfig.getErrorTopic());
+      ProcessorResult<Map<String, List<JSONObject>>> result = runner.process(kafkaProcessor);
 
       // validate the output messages
       Map<String,List<JSONObject>> outputMessages = result.getResult();
@@ -166,45 +149,209 @@ public class WriterBoltIntegrationTest extends BaseIntegrationTest {
     }
   }
 
+  /**
+   * Setup external components (as side effects of invoking this method):
+   * zookeeper, kafka, config upload, parser topology, main runner.
+   *
+   * Modifies topology properties with relevant component properties, e.g. kafka.broker.
+   *
+   * @return runner
+   */
+  public ComponentRunner setupTopologyComponents(Properties topologyProperties, String sensorType,
+      SensorParserConfig parserConfig, String globalConfig) {
+    zkServerComponent = getZKServerComponent(topologyProperties);
+    kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{
+      add(new KafkaComponent.Topic(sensorType, 1));
+      add(new KafkaComponent.Topic(parserConfig.getErrorTopic(), 1));
+      add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
+    }});
+    topologyProperties.setProperty("kafka.broker", kafkaComponent.getBrokerList());
+
+    configUploadComponent = new ConfigUploadComponent()
+        .withTopologyProperties(topologyProperties)
+        .withGlobalConfig(globalConfig)
+        .withParserSensorConfig(sensorType, parserConfig);
+
+    parserTopologyComponent = new ParserTopologyComponent.Builder()
+        .withSensorType(sensorType)
+        .withTopologyProperties(topologyProperties)
+        .withBrokerUrl(kafkaComponent.getBrokerList())
+        .withErrorTopic(parserConfig.getErrorTopic())
+        .withOutputTopic(parserConfig.getOutputTopic())
+        .build();
+
+    return new ComponentRunner.Builder()
+        .withComponent("zk", zkServerComponent)
+        .withComponent("kafka", kafkaComponent)
+        .withComponent("config", configUploadComponent)
+        .withComponent("org/apache/storm", parserTopologyComponent)
+        .withMillisecondsBetweenAttempts(5000)
+        .withNumRetries(10)
+        .withCustomShutdownOrder(new String[]{"org/apache/storm","config","kafka","zk"})
+        .build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private KafkaProcessor<Map<String, List<JSONObject>>> getKafkaProcessor(String outputTopic,
+      String errorTopic) {
+
+    return new KafkaProcessor<>()
+        .withKafkaComponentName("kafka")
+        .withReadTopic(outputTopic)
+        .withErrorTopic(errorTopic)
+        .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
+          @Nullable
+          @Override
+          public Boolean apply(@Nullable KafkaMessageSet messageSet) {
+            return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2);
+          }
+        })
+        .withProvideResult(new Function<KafkaMessageSet, Map<String, List<JSONObject>>>() {
+          @Nullable
+          @Override
+          public Map<String, List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
+            return new HashMap<String, List<JSONObject>>() {{
+              put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
+              put(errorTopic, loadMessages(messageSet.getErrors()));
+            }};
+          }
+        });
+  }
+
   private static List<JSONObject> loadMessages(List<byte[]> outputMessages) {
     List<JSONObject> tmp = new ArrayList<>();
     Iterables.addAll(tmp,
-            Iterables.transform(outputMessages,
-                    message -> {
-                      try {
-                        return new JSONObject(JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
-                      } catch (Exception ex) {
-                        throw new IllegalStateException(ex);
-                      }
-                    }
-            )
+        Iterables.transform(outputMessages,
+            message -> {
+              try {
+                return new JSONObject(
+                    JSONUtils.INSTANCE.load(new String(message), JSONUtils.MAP_SUPPLIER));
+              } catch (Exception ex) {
+                throw new IllegalStateException(ex);
+              }
+            }
+        )
     );
     return tmp;
   }
 
-  @SuppressWarnings("unchecked")
-  private KafkaProcessor<Map<String,List<JSONObject>>> getProcessor(String outputTopic, String errorTopic){
+  /**
+   * { }
+   */
+  @Multiline
+  public static String globalConfigEmpty;
 
-    return new KafkaProcessor<>()
-            .withKafkaComponentName("kafka")
-            .withReadTopic(outputTopic)
-            .withErrorTopic(errorTopic)
-            .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() {
-              @Nullable
-              @Override
-              public Boolean apply(@Nullable KafkaMessageSet messageSet) {
-                return (messageSet.getMessages().size() == 1) && (messageSet.getErrors().size() == 2);
-              }
-            })
-            .withProvideResult(new Function<KafkaMessageSet,Map<String,List<JSONObject>>>(){
-              @Nullable
-              @Override
-              public Map<String,List<JSONObject>> apply(@Nullable KafkaMessageSet messageSet) {
-                return new HashMap<String, List<JSONObject>>() {{
-                  put(Constants.ENRICHMENT_TOPIC, loadMessages(messageSet.getMessages()));
-                  put(errorTopic, loadMessages(messageSet.getErrors()));
-                }};
-              }
-            });
+  /**
+   * {
+   *    "parserClassName":"org.apache.metron.writers.integration.WriterBoltIntegrationTest$EmptyObjectParser",
+   *    "sensorTopic":"emptyobjectparser",
+   *    "outputTopic": "enrichments",
+   *    "errorTopic": "parser_error"
+   * }
+   */
+  @Multiline
+  public static String offsetParserConfigJSON;
+
+  @Test
+  public void commits_kafka_offsets_for_emtpy_objects() throws Exception {
+    final String sensorType = "emptyobjectparser";
+    SensorParserConfig parserConfig = JSONUtils.INSTANCE.load(offsetParserConfigJSON, SensorParserConfig.class);
+    final List<byte[]> inputMessages = new ArrayList<byte[]>() {{
+      add(Bytes.toBytes("foo"));
+      add(Bytes.toBytes("bar"));
+      add(Bytes.toBytes("baz"));
+    }};
+    final Properties topologyProperties = new Properties();
+    ComponentRunner runner = setupTopologyComponents(topologyProperties, sensorType, parserConfig, globalConfigEmpty);
+    try {
+      runner.start();
+      kafkaComponent.writeMessages(sensorType, inputMessages);
+      Processor allResultsProcessor = new AllResultsProcessor(inputMessages, Constants.ENRICHMENT_TOPIC);
+      ProcessorResult<Set<JSONObject>> result = runner.process(allResultsProcessor);
+
+      // validate the output messages
+      assertThat("size should match", result.getResult().size(), equalTo(inputMessages.size()));
+      for (JSONObject record : result.getResult()) {
+        assertThat("record should have a guid", record.containsKey("guid"), equalTo(true));
+        assertThat("record should have correct source.type", record.get("source.type"),
+            equalTo(sensorType));
+      }
+    } finally {
+      if (runner != null) {
+        runner.stop();
+      }
+    }
+  }
+
+  /**
+   * Goal is to check returning an empty JSONObject in our List returned by parse.
+   */
+  public static class EmptyObjectParser implements MessageParser<JSONObject>, Serializable {
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public List<JSONObject> parse(byte[] bytes) {
+      return ImmutableList.of(new JSONObject());
+    }
+
+    @Override
+    public boolean validate(JSONObject message) {
+      return true;
     }
+
+    @Override
+    public void configure(Map<String, Object> map) {
+    }
+  }
+
+  /**
+   * Verifies all messages in the provided List of input messages appears in the specified
+   * Kafka output topic
+   */
+  private class AllResultsProcessor implements  Processor<Set<JSONObject>> {
+
+    private final List<byte[]> inputMessages;
+    private String outputKafkaTopic;
+    // used for calculating readiness and returning result set
+    private final Set<JSONObject> outputMessages = new HashSet<>();
+
+    public AllResultsProcessor(List<byte[]> inputMessages, String outputKafkaTopic) {
+      this.inputMessages = inputMessages;
+      this.outputKafkaTopic = outputKafkaTopic;
+    }
+
+    @Override
+    public ReadinessState process(ComponentRunner runner) {
+      KafkaComponent kc = runner.getComponent("kafka", KafkaComponent.class);
+      outputMessages.addAll(readMessagesFromKafka(kc, outputKafkaTopic));
+      return calcReadiness(inputMessages.size(), outputMessages.size());
+    }
+
+    private Set<JSONObject> readMessagesFromKafka(KafkaComponent kc, String topic) {
+      Set<JSONObject> out = new HashSet<>();
+      for (byte[] b : kc.readMessages(topic)) {
+        try {
+          JSONObject m = new JSONObject(
+              JSONUtils.INSTANCE.load(new String(b), JSONUtils.MAP_SUPPLIER));
+          out.add(m);
+        } catch (IOException e) {
+          throw new IllegalStateException(e);
+        }
+      }
+      return out;
+    }
+
+    private ReadinessState calcReadiness(int in, int out) {
+      return in == out ? ReadinessState.READY : ReadinessState.NOT_READY;
+    }
+
+    @Override
+    public ProcessorResult<Set<JSONObject>> getResult() {
+      return new ProcessorResult<>(outputMessages, null);
+    }
+  }
+
 }


[06/11] metron git commit: METRON-1545 Upgrade Spring and Spring Boot (merrimanr) closes apache/metron#1008

Posted by ot...@apache.org.
METRON-1545 Upgrade Spring and Spring Boot (merrimanr) closes apache/metron#1008


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/ac056381
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/ac056381
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/ac056381

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: ac05638160288b32535b986adbeb8f14e594e740
Parents: 3bb926d
Author: merrimanr <me...@gmail.com>
Authored: Fri May 4 16:22:17 2018 -0500
Committer: merrimanr <me...@apache.org>
Committed: Fri May 4 16:22:17 2018 -0500

----------------------------------------------------------------------
 dependencies_with_url.csv                       | 56 +++++++++++++++++++-
 metron-interface/metron-rest/README.md          |  2 +
 metron-interface/metron-rest/pom.xml            | 17 ++++--
 .../metron/rest/MetronRestApplication.java      |  7 +++
 .../apache/metron/rest/MetronRestConstants.java |  2 +
 .../metron/rest/config/JpaConfiguration.java    |  6 +--
 .../metron/rest/config/WebSecurityConfig.java   | 19 +++++--
 .../src/main/resources/application-test.yml     |  2 +-
 .../metron-rest/src/main/scripts/metron-rest.sh |  2 +-
 9 files changed, 97 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 1e73eb1..df3bcd2 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -4,6 +4,7 @@ org.objenesis:objenesis:jar:1.2:compile,Apache v2,http://objenesis.org/
 org.objenesis:objenesis:jar:2.1:compile,Apache v2,http://objenesis.org/
 org.ow2.asm:asm:jar:4.1:compile,BSD,http://asm.ow2.org/
 org.ow2.asm:asm:jar:5.0.3:compile,BSD,http://asm.ow2.org/
+org.ow2.asm:asm:jar:5.0.4:compile,BSD,http://asm.ow2.org/
 org.reflections:reflections:jar:0.9.10:compile,BSD,https://github.com/ronmamo/reflections
 org.javassist:javassist:jar:3.19.0-GA:compile,Apache v2,https://github.com/jboss-javassist/javassist
 org.javassist:javassist:jar:3.17.1-GA:compile,Apache v2,https://github.com/jboss-javassist/javassist
@@ -23,6 +24,7 @@ com.google.protobuf:protobuf-java:jar:2.5.0:compile,New BSD license,http://code.
 com.google.protobuf:protobuf-java:jar:2.6.1:compile,New BSD license,http://code.google.com/p/protobuf
 com.jcraft:jsch:jar:0.1.42:compile,BSD,http://www.jcraft.com/jsch/
 com.jayway.jsonpath:json-path:jar:2.3.0:compile,Apache v2,https://github.com/json-path/JsonPath
+com.jayway.jsonpath:json-path:jar:2.4.0:compile,Apache v2,https://github.com/json-path/JsonPath
 net.minidev:accessors-smart:jar:1.2:compile,Apache v2,https://github.com/netplex/json-smart-v2
 net.minidev:json-smart:jar:2.3:compile,Apache v2,https://github.com/netplex/json-smart-v2
 com.maxmind.db:maxmind-db:jar:1.2.1:compile,CC-BY-SA 3.0,https://github.com/maxmind/MaxMind-DB
@@ -34,10 +36,12 @@ it.unimi.dsi:fastutil:jar:7.0.6:compile,ASLv2,https://github.com/vigna/fastutil
 javassist:javassist:jar:3.12.1.GA:compile,Apache v2,http://www.javassist.org/
 javax.activation:activation:jar:1.1:compile,Common Development and Distribution License (CDDL) v1.0,http://java.sun.com/products/javabeans/jaf/index.jsp
 javax.annotation:jsr250-api:jar:1.0:compile,COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0,http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html
+javax.annotation:javax.annotation-api:jar:1.3.2:compile,CDDL 1.1,https://github.com/javaee/javax.annotation/
 javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
 javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net
 javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/
 javax.xml.bind:jaxb-api:jar:2.2.2:compile,CDDL,https://jaxb.dev.java.net/
+javax.xml.bind:jaxb-api:jar:2.3.0:compile,CDDL,https://jaxb.dev.java.net/
 javax.xml.stream:stax-api:jar:1.0-2:compile,COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0,https://docs.oracle.com/javase/7/docs/api/javax/xml/stream/package-summary.html
 jline:jline:jar:0.9.94:compile,BSD,http://jline.sourceforge.net
 junit:junit:jar:4.12:compile,Eclipse Public License 1.0,http://junit.org
@@ -87,15 +91,18 @@ org.slf4j:slf4j-api:jar:1.7.5:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-api:jar:1.7.6:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-api:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-api:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:slf4j-api:jar:1.7.25:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.6.1:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.10:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.10:runtime,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:slf4j-log4j12:jar:1.7.25:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.5:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-simple:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
 org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
+org.slf4j:jul-to-slf4j:jar:1.7.25:compile,MIT,http://www.slf4j.org
 aopalliance:aopalliance:jar:1.0:compile,Public Domain,http://aopalliance.sourceforge.net
 com.101tec:zkclient:jar:0.8:compile,The Apache Software License, Version 2.0,https://github.com/sgroschupf/zkclient
 com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile,Apache License, Version 2.0,http://stephenc.github.com/findbugs-annotations
@@ -111,14 +118,17 @@ com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile,BSD,https://g
 com.fasterxml.jackson.core:jackson-annotations:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
 com.fasterxml.jackson.core:jackson-annotations:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-annotations:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-core:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
 com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.8.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
+com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
 com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForCbor
 com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile
 com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson
@@ -126,7 +136,12 @@ com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.7.4:compile,ASLv2
 com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.7.4:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile
 com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson
 com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.8.1:compile,ASLv2,https://github.com/FasterXML/jackson-datatype-joda
+com.fasterxml.jackson.datatype:jackson-datatype-joda:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-datatype-joda
+com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
+com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
+com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
 com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
+com.fasterxml:classmate:jar:1.3.4:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
 com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
 com.google.code.gson:gson:jar:2.7:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
 com.google.guava:guava:jar:11.0.2:compile,ASLv2,
@@ -158,6 +173,7 @@ commons-codec:commons-codec:jar:1.10:compile,ASLv2,http://commons.apache.org/pro
 commons-codec:commons-codec:jar:1.4:compile,ASLv2,http://commons.apache.org/codec/
 commons-codec:commons-codec:jar:1.6:compile,ASLv2,http://commons.apache.org/codec/
 commons-codec:commons-codec:jar:1.9:compile,ASLv2,http://commons.apache.org/proper/commons-codec/
+commons-codec:commons-codec:jar:1.11:compile,ASLv2,http://commons.apache.org/proper/commons-codec/
 commons-collections:commons-collections:jar:3.2.1:compile,ASLv2,http://commons.apache.org/collections/
 commons-collections:commons-collections:jar:3.2.2:compile,ASLv2,http://commons.apache.org/collections/
 commons-configuration:commons-configuration:jar:1.10:compile,ASLv2,http://commons.apache.org/configuration/
@@ -197,6 +213,7 @@ io.thekraken:grok:jar:0.1.0:compile,Apache License, Version 2.0,http://maven.apa
 javax.inject:javax.inject:jar:1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/atinject/
 joda-time:joda-time:jar:2.3:compile,Apache 2,http://www.joda.org/joda-time/
 joda-time:joda-time:jar:2.8.2:compile,Apache 2,http://www.joda.org/joda-time/
+joda-time:joda-time:jar:2.9.9:compile,Apache 2,http://www.joda.org/joda-time/
 log4j:log4j:jar:1.2.15:compile,The Apache Software License, Version 2.0,http://logging.apache.org:80/log4j/1.2/
 log4j:log4j:jar:1.2.16:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/
 log4j:log4j:jar:1.2.17:compile,The Apache Software License, Version 2.0,http://logging.apache.org/log4j/1.2/
@@ -234,24 +251,36 @@ org.springframework.integration:spring-integration-http:jar:3.0.0.RELEASE:compil
 org.springframework.retry:spring-retry:jar:1.0.3.RELEASE:compile,Apache 2.0,http://www.springsource.org
 org.springframework:spring-aop:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-aop:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-aop:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-aspects:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-aspects:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-beans:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-beans:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-beans:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-context:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-context:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-context:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-core:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-core:jar:4.1.4.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-core:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-core:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-expression:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-expression:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-expression:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-jcl:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-jdbc:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-jdbc:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-orm:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-orm:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-tx:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-tx:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-tx:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-web:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-web:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-web:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.springframework:spring-webmvc:jar:3.2.6.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/SpringSource/spring-framework
 org.springframework:spring-webmvc:jar:4.3.3.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
+org.springframework:spring-webmvc:jar:5.0.5.RELEASE:compile,The Apache Software License, Version 2.0,https://github.com/spring-projects/spring-framework
 org.tukaani:xz:jar:1.0:compile,Public Domain,http://tukaani.org/xz/java.html
 org.xerial.snappy:snappy-java:jar:1.0.4.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/snappy-java/
 org.xerial.snappy:snappy-java:jar:1.1.1.7:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
@@ -277,30 +306,50 @@ io.swagger:swagger-annotations:jar:1.5.9:compile,ASLv2,https://github.com/swagge
 io.swagger:swagger-models:jar:1.5.9:compile,ASLv2,https://github.com/swagger-api/swagger-core
 javax.transaction:javax.transaction-api:jar:1.2:compile,CDDL-1.0,https://java.net/projects/jta-spec/
 javax.validation:validation-api:jar:1.1.0.Final:compile,ASLv2,http://beanvalidation.org
+javax.validation:validation-api:jar:2.0.1.Final:compile,ASLv2,http://beanvalidation.org
 joda-time:joda-time:jar:2.9.4:compile,ASLv2,https://github.com/JodaOrg/joda-time
 org.aspectj:aspectjweaver:jar:1.8.9:compile,EPL 1.0,https://eclipse.org/aspectj
+org.aspectj:aspectjweaver:jar:1.8.13:compile,EPL 1.0,https://eclipse.org/aspectj
 org.jboss.logging:jboss-logging:jar:3.3.0.Final:compile,ASLv2,https://github.com/jboss-logging
+org.jboss.logging:jboss-logging:jar:3.3.2.Final:compile,ASLv2,https://github.com/jboss-logging
 org.jboss:jandex:jar:2.0.0.Final:compile,ASLv2,https://github.com/wildfly/jandex
 org.mapstruct:mapstruct:jar:1.0.0.Final:compile,ASLv2,https://github.com/mapstruct/mapstruct
 org.springframework.boot:spring-boot-autoconfigure:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-autoconfigure:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-aop:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-aop:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-data-jpa:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-data-jpa:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-jdbc:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-jdbc:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-json:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-logging:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-logging:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-security:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-security:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-tomcat:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-tomcat:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter-web:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter-web:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot-starter:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot-starter:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.boot:spring-boot:jar:1.4.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
+org.springframework.boot:spring-boot:jar:2.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-boot
 org.springframework.data:spring-data-commons:jar:1.12.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-data-commons
+org.springframework.data:spring-data-commons:jar:2.0.6.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-data-commons
 org.springframework.data:spring-data-jpa:jar:1.10.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-data-jpa
+org.springframework.data:spring-data-jpa:jar:2.0.6.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-data-jpa
 org.springframework.plugin:spring-plugin-core:jar:1.2.0.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-plugin
 org.springframework.plugin:spring-plugin-metadata:jar:1.2.0.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-plugin
 org.springframework.security:spring-security-config:jar:4.1.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
+org.springframework.security:spring-security-config:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
 org.springframework.security:spring-security-core:jar:4.1.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
+org.springframework.security:spring-security-core:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
 org.springframework.security:spring-security-web:jar:4.1.3.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
+org.springframework.security:spring-security-web:jar:5.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security
 antlr:antlr:jar:2.7.7:compile,BSD 3-Clause License,http://www.antlr2.org
 com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/license.html
+com.h2database:h2:jar:1.4.197:compile,EPL 1.0,http://www.h2database.com/html/license.html
 de.jollyday:jollyday:jar:0.5.2:compile,ASLv2,http://jollyday.sourceforge.net/license.html
 org.threeten:threeten-extra:jar:1.0:compile,BSD,http://www.threeten.org/threeten-extra/license.html
 org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/classindex
@@ -311,6 +360,7 @@ net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/b
 org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
 org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
 org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka
+org.springframework.kafka:spring-kafka:jar:2.0.4.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka
 ch.hsr:geohash:jar:1.3.0:compile,ASLv2,https://github.com/kungfoo/geohash-java
 org.locationtech.spatial4j:spatial4j:jar:0.6:compile,ASLv2,https://github.com/locationtech/spatial4j
 com.github.spullara.mustache.java:compiler:jar:0.9.3:compile,ASLv2,https://github.com/spullara/mustache.java/blob/master/LICENSE
@@ -345,7 +395,8 @@ org.eclipse.persistence:org.eclipse.persistence.core:jar:2.6.4:compile,EPL 1.0,h
 org.eclipse.persistence:org.eclipse.persistence.jpa.jpql:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
 org.eclipse.persistence:org.eclipse.persistence.jpa:jar:2.6.4:compile,EPL 1.0,http://www.eclipse.org/eclipselink
 com.github.ben-manes.caffeine:caffeine:jar:2.6.2:compile,ASLv2,https://github.com/ben-manes/caffeine/blob/v2.6.2/LICENSE
-com.google.code.gson:gson:jar:2.2:compile
+com.google.code.gson:gson:jar:2.2:compile,ASLv2,https://github.com/google/gson
+com.google.code.gson:gson:jar:2.8.2:compile,ASLv2,https://github.com/google/gson
   org.codehaus.plexus:plexus-classworlds:jar:2.4:compile
   org.codehaus.plexus:plexus-component-annotations:jar:1.5.5:compile
   org.codehaus.plexus:plexus-interpolation:jar:1.14:compile
@@ -360,3 +411,6 @@ com.google.code.gson:gson:jar:2.2:compile
   org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
   org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
   org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
+com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
+org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
+

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index c928d8f..2a6a0e0 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -135,6 +135,8 @@ The following configures the application for MySQL:
     GRANT ALL PRIVILEGES ON metronrest.* TO 'metron'@'node1';
     ```
 
+1. Create the security tables as described in the [Spring Security Guide](https://docs.spring.io/spring-security/site/docs/5.0.4.RELEASE/reference/htmlsingle/#user-schema).
+
 1. Install the MySQL JDBC client onto the REST application host and configurate the METRON_JDBC_CLIENT_PATH variable:
     ```
     cd $METRON_HOME/lib

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/pom.xml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml
index dcdea2b..13d23fe 100644
--- a/metron-interface/metron-rest/pom.xml
+++ b/metron-interface/metron-rest/pom.xml
@@ -29,13 +29,15 @@
         <antlr.version>4.5</antlr.version>
         <curator.version>2.7.1</curator.version>
         <powermock.version>1.6.4</powermock.version>
-        <spring.boot.version>1.4.1.RELEASE</spring.boot.version>
+        <spring.boot.version>2.0.1.RELEASE</spring.boot.version>
         <spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
         <swagger.version>2.5.0</swagger.version>
         <mysql.client.version>5.1.40</mysql.client.version>
-        <spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
-        <spring.version>4.2.2.RELEASE</spring.version>
+        <spring-kafka.version>2.0.4.RELEASE</spring-kafka.version>
+        <spring.version>5.0.5.RELEASE</spring.version>
         <eclipse.link.version>2.6.4</eclipse.link.version>
+        <jackson.version>2.9.5</jackson.version>
+        <jsonpath.version>2.4.0</jsonpath.version>
     </properties>
     <dependencies>
       <dependency>
@@ -129,12 +131,12 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-            <version>2.8.3</version>
+            <version>${jackson.version}</version>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.datatype</groupId>
             <artifactId>jackson-datatype-joda</artifactId>
-            <version>2.8.1</version>
+            <version>${jackson.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
@@ -223,6 +225,11 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>${jsonpath.version}</version>
+        </dependency>
+        <dependency>
             <groupId>io.springfox</groupId>
             <artifactId>springfox-swagger2</artifactId>
             <version>${swagger.version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
index 5135849..52cdf8f 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestApplication.java
@@ -19,13 +19,20 @@ package org.apache.metron.rest;
 
 import org.apache.metron.rest.util.ParserIndex;
 import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.gson.GsonAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+
+import static org.apache.metron.rest.MetronRestConstants.LOGGING_SYSTEM_PROPERTY;
 
 @SpringBootApplication
+@EnableAutoConfiguration(exclude = { GsonAutoConfiguration.class, KafkaAutoConfiguration.class })
 public class MetronRestApplication {
 
   public static void main(String[] args) {
     ParserIndex.reload();
+    System.setProperty(LOGGING_SYSTEM_PROPERTY, "none");
     SpringApplication.run(MetronRestApplication.class, args);
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 4567197..c4873f9 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -72,4 +72,6 @@ public class MetronRestConstants {
 
   public static final String USER_SETTINGS_HBASE_TABLE_SPRING_PROPERTY = "user.settings.table";
   public static final String USER_SETTINGS_HBASE_CF_SPRING_PROPERTY = "user.settings.cf";
+
+  public static final String LOGGING_SYSTEM_PROPERTY = "org.springframework.boot.logging.LoggingSystem";
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/JpaConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/JpaConfiguration.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/JpaConfiguration.java
index 80c9d1a..f1d48e1 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/JpaConfiguration.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/JpaConfiguration.java
@@ -25,6 +25,7 @@ import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.boot.autoconfigure.domain.EntityScan;
 import org.springframework.boot.autoconfigure.orm.jpa.JpaBaseConfiguration;
 import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
+import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.orm.jpa.vendor.AbstractJpaVendorAdapter;
 import org.springframework.orm.jpa.vendor.EclipseLinkJpaVendorAdapter;
@@ -34,9 +35,8 @@ import org.springframework.transaction.jta.JtaTransactionManager;
 @EntityScan("org.apache.metron")
 public class JpaConfiguration extends JpaBaseConfiguration {
 
-  protected JpaConfiguration(DataSource dataSource, JpaProperties properties,
-      ObjectProvider<JtaTransactionManager> jtaTransactionManagerProvider) {
-    super(dataSource, properties, jtaTransactionManagerProvider);
+  protected JpaConfiguration(DataSource dataSource, JpaProperties properties, ObjectProvider<JtaTransactionManager> jtaTransactionManager, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
+    super(dataSource, properties, jtaTransactionManager, transactionManagerCustomizers);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/WebSecurityConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/WebSecurityConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/WebSecurityConfig.java
index 36846de..f84cdfa 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/WebSecurityConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/WebSecurityConfig.java
@@ -22,6 +22,7 @@ import static org.apache.metron.rest.MetronRestConstants.SECURITY_ROLE_USER;
 
 import org.apache.metron.rest.MetronRestConstants;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.env.Environment;
 import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
@@ -29,6 +30,9 @@ import org.springframework.security.config.annotation.method.configuration.Enabl
 import org.springframework.security.config.annotation.web.builders.HttpSecurity;
 import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
 import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.crypto.password.NoOpPasswordEncoder;
+import org.springframework.security.crypto.password.PasswordEncoder;
 import org.springframework.security.web.authentication.logout.HttpStatusReturningLogoutSuccessHandler;
 import org.springframework.security.web.csrf.CookieCsrfTokenRepository;
 import org.springframework.stereotype.Controller;
@@ -87,13 +91,18 @@ public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
         List<String> activeProfiles = Arrays.asList(environment.getActiveProfiles());
         if (activeProfiles.contains(MetronRestConstants.DEV_PROFILE) ||
                 activeProfiles.contains(MetronRestConstants.TEST_PROFILE)) {
-            auth.jdbcAuthentication().dataSource(dataSource)
-                    .withUser("user").password("password").roles(SECURITY_ROLE_USER).and()
-                    .withUser("user1").password("password").roles(SECURITY_ROLE_USER).and()
-                    .withUser("user2").password("password").roles(SECURITY_ROLE_USER).and()
-                    .withUser("admin").password("password").roles(SECURITY_ROLE_USER, SECURITY_ROLE_ADMIN);
+          auth.jdbcAuthentication().dataSource(dataSource)
+                  .withUser("user").password("password").roles(SECURITY_ROLE_USER).and()
+                  .withUser("user1").password("password").roles(SECURITY_ROLE_USER).and()
+                  .withUser("user2").password("password").roles(SECURITY_ROLE_USER).and()
+                  .withUser("admin").password("password").roles(SECURITY_ROLE_USER, SECURITY_ROLE_ADMIN);
         } else {
             auth.jdbcAuthentication().dataSource(dataSource);
         }
     }
+
+    @Bean
+    public PasswordEncoder passwordEncoder() {
+        return NoOpPasswordEncoder.getInstance();
+    }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/src/main/resources/application-test.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml
index 891f554..0e794cb 100644
--- a/metron-interface/metron-rest/src/main/resources/application-test.yml
+++ b/metron-interface/metron-rest/src/main/resources/application-test.yml
@@ -24,7 +24,7 @@ spring:
     hibernate:
       ddl-auto: create-drop
   main:
-    banner-mode: off
+    banner-mode: 'off'
 
 grok:
   path:

http://git-wip-us.apache.org/repos/asf/metron/blob/ac056381/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
index c293566..420c539 100644
--- a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
+++ b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
@@ -78,7 +78,7 @@ echo "METRON_SPRING_PROFILES_ACTIVE=${METRON_SPRING_PROFILES_ACTIVE}"
 
 # the vagrant Spring profile provides configuration values, otherwise configuration is provided by rest_application.yml
 if [[ !(${METRON_SPRING_PROFILES_ACTIVE} == *"vagrant"*) ]]; then
-    METRON_CONFIG_LOCATION=" --spring.config.location=$METRON_HOME/config/rest_application.yml"
+    METRON_CONFIG_LOCATION=" --spring.config.location=$METRON_HOME/config/rest_application.yml,classpath:/application.yml"
     echo "METRON_CONFIG_LOCATION=${METRON_CONFIG_LOCATION}"
     METRON_SPRING_OPTIONS+=${METRON_CONFIG_LOCATION}
 fi