You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2019/05/21 00:06:43 UTC

[metron] branch master updated: METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383

This is an automated email from the ASF dual-hosted git repository.

mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b04460  METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383
3b04460 is described below

commit 3b0446006c327b7dea494b3a0bcbb9de4f662d5a
Author: tigerquoll <ti...@outlook.com>
AuthorDate: Mon May 20 18:05:50 2019 -0600

    METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383
---
 metron-analytics/metron-profiler-spark/README.md   |  61 ++++++++--
 metron-analytics/metron-profiler-spark/pom.xml     |   6 +
 .../profiler/spark/cli/BatchProfilerCLI.java       | 126 +++++++++++++++++++--
 .../spark/cli/BatchProfilerCLIOptions.java         |  15 ++-
 .../src/main/scripts/start_batch_profiler.sh       |  18 ++-
 .../profiler/spark/cli/BatchProfilerCLITest.java   |  28 ++++-
 .../spark/cli/BatchProfilerZKIntegrationTest.java  |  82 ++++++++++++++
 .../apache/metron/integration/TestZKServer.java    |  79 +++++++++++++
 8 files changed, 395 insertions(+), 20 deletions(-)

diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md
index 5ee8510..8750550 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -42,8 +42,7 @@ The portion of a profile produced by the Batch Profiler should be indistinguisha
 For an introduction to the Profiler, see the [Profiler README](../metron-profiler-common/README.md).
 
 ## Getting Started
-
-1. Create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows.  
+1. If a profile file does not already exist, you can create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows.
 
     ```
     cat $METRON_HOME/config/zookeeper/profiler.json
@@ -60,7 +59,8 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile
       "timestampField": "timestamp"
     }
     ```
-
+    See [Specifying profiles](#specifying-profiles) for information on how to load profile definitions from zookeeper.
+ 
 1. Ensure that you have archived telemetry available for the Batch Profiler to consume.  By default, Metron will store this in HDFS at `/apps/metron/indexing/indexed/*/*`.
 
     ```
@@ -80,7 +80,6 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile
 	  ```
 	  log4j.logger.org.apache.metron.profiler.spark=DEBUG
 	  ```
-
 1. Run the Batch Profiler.
 
     ```
@@ -91,6 +90,41 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile
 
 1. Query for the profile data using the [Profiler Client](../metron-profiler-client/README.md).
 
+## Specifying profiles
+
+The profile to use for batch processing can be specified as either a JSON file on disk
+or by utilizing a profile already loaded into zookeeper for use by the streaming profiler.
+
+### Loading a profile from disk
+
+1. If a profile file does not already exist, you can create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows.
+
+    ```
+    cat $METRON_HOME/config/zookeeper/profiler.json
+    {
+      "profiles": [
+        {
+          "profile": "hello-world",
+          "foreach": "'global'",
+          "init":    { "count": "0" },
+          "update":  { "count": "count + 1" },
+          "result":  "count"
+        }
+      ],
+      "timestampField": "timestamp"
+    }
+    ```
+1.  When launching the batch profiler directly, use the `--profiles <path to profiler.json>` option. 
+If using the wrapper script to launch the batch profiler, it will automatically add the command argument
+`--profiles  $METRON_HOME/config/zookeeper/profiler.json ` to the batch launching process if `$SPARK_PROFILER_USE_ZOOKEEPER` is not defined.
+
+### Loading a profile from zookeeper
+
+Choose to use profiles already loaded into zookeeper (e.g. for use by the streaming profiler) by setting the environment variable `$SPARK_PROFILER_USE_ZOOKEEPER`. 
+This will cause the wrapper script to add `--zookeeper $ZOOKEEPER` to the batch launching process, 
+which will cause the spark profiler to extract profiles from the zookeeper quorum located at `$ZOOKEEPER`. 
+
+
 ## Installation
 
 The Batch Profiler package is installed automatically when installing Metron using the Ambari MPack.  See the following notes when installing the Batch Profiler without the Ambari MPack.
@@ -147,9 +181,11 @@ The Batch Profiler requires Spark version 2.3.0+.
 
 A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been provided to simplify running the Batch Profiler.  This script makes the following assumptions.
 
-  * The script builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`.
-
+  * The script either 
+       * builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`. or
+       * utilises the profiles already loaded into zookeeper quorum at `$ZOOKEEPER` if the environment variable `$SPARK_PROFILER_USE_ZOOKEEPER` is set.
   * The properties defined in `$METRON_HOME/config/batch-profiler.properties` are passed to both the Profiler and Spark.  You can define both Spark and Profiler properties in this same file.
+  * The script will also configure the event time field to use if the field value is stored in the `${SPARK_PROFILER_EVENT_TIMESTAMP_FIELD}` environment variable.
 
   * The script assumes that Spark is installed at `/usr/hdp/current/spark2-client`.  This can be overridden if you define an environment variable called `SPARK_HOME` prior to executing the script.
 
@@ -171,6 +207,8 @@ The Batch Profiler accepts the following arguments when run from the command lin
 | Argument                              | Description
 |---                                    |---
 | [`-p`, `--profiles`](#--profiles)     | Path to the profile definitions.
+| [`-z`, `--zookeeper`](#--zookeeper)   | Zookeeper quorum to read profile definitions from.
+| [`-t`, `--timestampfield`](#--timestampfield) | Which data field to use for event time.
 | [`-c`, `--config`](#--config)         | Path to the profiler properties file.
 | [`-g`, `--globals`](#--globals)       | Path to the Stellar global config file.
 | [`-r`, `--reader`](#--reader)         | Path to properties for the DataFrameReader.
@@ -178,7 +216,16 @@ The Batch Profiler accepts the following arguments when run from the command lin
 
 #### `--profiles`
 
-The path to a file containing the profile definition in JSON.
+The path to a file containing the profile definition in JSON. Only one of `--zookeeper` or `--profiles` should be used
+
+#### `--zookeeper`
+
+Read profile definitions from the zookeeper quorum at this address. Only one of `--zookeeper` or `--profiles` should be used.
+
+#### `--timestampfield`
+
+Specifies which data field to utilising for event time information. The field to use for event time is usually stored as part of the profile. 
+It can be overridden via this setting.
 
 #### `--config`
 
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 40bd551..8b8d510 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -144,6 +144,12 @@
             <version>${global_log4j_core_version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-integration-test</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
index 29fe4a2..9171f01 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
@@ -17,15 +17,22 @@
  *  limitations under the License.
  *
  */
+
 package org.apache.metron.profiler.spark.cli;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
 import org.apache.metron.profiler.spark.BatchProfiler;
+import org.apache.metron.zookeeper.ZKCache;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
@@ -35,19 +42,23 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
+import java.util.Optional;
 import java.util.Properties;
 
-import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_TIMESTAMP_FLD;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_ZK;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE;
 import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse;
 
 /**
- * The main entry point which launches the Batch Profiler in Spark.
- *
+ * The main entry point which launches the Batch Profiler iin Spark.
+ * Profiles can be read from either files (utilising --profiles)
+ * or zookeeper (utilising --zookeeper)
  * With this class the Batch Profiler can be submitted using the following command.
- *
+ * <p></p>
  * <pre>{@code
  *  $SPARK_HOME/bin/spark-submit \
  *    --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \
@@ -58,6 +69,17 @@ import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse
  *     --profiles profiles.json \
  *     --reader reader.properties
  * }</pre>
+ * <p></p>
+ *  Or to pull the profile information from zookeeper
+ *  <pre>{@code
+ *   $SPARK_HOME/bin/spark-submit \
+ *     --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \
+ *     --properties-file spark.properties \
+ *     metron-profiler-spark-<version>.jar \
+ *     --globals global.properties \
+ *     --zookeeper ZookeeperQuorumForProfiles
+ *     --reader reader.properties
+ *  }</pre>
  */
 public class BatchProfilerCLI implements Serializable {
 
@@ -68,17 +90,19 @@ public class BatchProfilerCLI implements Serializable {
   public static Properties readerProps;
   public static ProfilerConfig profiles;
 
-  public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException {
+  public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException, Exception {
     // parse the command line
     CommandLine commandLine = parseCommandLine(args);
+
+    // read profile information
+    profiles = Preconditions.checkNotNull(handleProfileDefinitions(commandLine), "An error occurred while reading profile data");
     profilerProps = handleProfilerProperties(commandLine);
     globals = handleGlobals(commandLine);
-    profiles = handleProfileDefinitions(commandLine);
     readerProps = handleReaderProperties(commandLine);
 
     // the batch profiler must use 'event time'
     if(!profiles.getTimestampField().isPresent()) {
-      throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined.");
+      throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined in the profile definitions file or via the --timestampField argument.");
     }
 
     // one or more profiles must be defined
@@ -97,6 +121,54 @@ public class BatchProfilerCLI implements Serializable {
   }
 
   /**
+   * Extracts profile information from a file or from zookeeper
+   * @param commandLine Command line information.
+   * @return Profile information
+   * @throws MissingOptionException if command line options are missing
+   * @throws IOException If there are disk or network issues retrieving profiles
+   */
+  private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws MissingOptionException, IOException {
+    final String PROFILE_LOCATION_ERROR =
+            "A single profile location (--profiles or --zookeeper) must be specified";
+    ProfilerConfig profiles;
+
+    if ((!PROFILE_ZK.has(commandLine)) && (!PROFILE_DEFN_FILE.has(commandLine))) {
+      throw new MissingOptionException(PROFILE_LOCATION_ERROR);
+    }
+    if (PROFILE_ZK.has(commandLine) && PROFILE_DEFN_FILE.has(commandLine)) {
+      throw new IllegalArgumentException(PROFILE_LOCATION_ERROR);
+    }
+
+    if (PROFILE_ZK.has(commandLine)) {
+      profiles = handleProfileDefinitionsZK(commandLine);
+    } else {
+      profiles = handleProfileDefinitionsFile(commandLine);
+    }
+
+    // event time can specified via command line override
+    if (PROFILE_TIMESTAMP_FLD.has(commandLine)) {
+      final String timestampField = PROFILE_TIMESTAMP_FLD.get(commandLine);
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(timestampField), "timestampField must be not be empty if specified");
+      profiles.setTimestampField(timestampField);
+    }
+    LOG.info("Utilising profile: {}", profiles.toString());
+    return profiles;
+  }
+
+  /**
+   * Loads Zookeeper client if one is configured.
+   * @param zkQuorum Address if zookeeper server
+   * @return CuratorFramework client if zookeeper configuration defined
+   */
+  private static CuratorFramework createZKClient(final String zkQuorum) {
+    LOG.info("Loading profiler properties from zookeeper quorum '{}'", zkQuorum);
+    final CuratorFramework zkClient = ZKCache.createClient(zkQuorum, Optional.empty());
+    zkClient.start();
+    LOG.info("Zookeeper client created successfully");
+    return zkClient;
+  }
+
+  /**
    * Load the Stellar globals from a file.
    *
    * @param commandLine The command line.
@@ -155,7 +227,7 @@ public class BatchProfilerCLI implements Serializable {
    *
    * @param commandLine The command line.
    */
-  private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws IOException {
+  private static ProfilerConfig handleProfileDefinitionsFile(CommandLine commandLine) throws IOException {
     ProfilerConfig profiles;
     if(PROFILE_DEFN_FILE.has(commandLine)) {
       String profilePath = PROFILE_DEFN_FILE.get(commandLine);
@@ -173,9 +245,45 @@ public class BatchProfilerCLI implements Serializable {
   }
 
   /**
+   * Load the profile definitions from ZK server identified in command line
+   * @param commandLine Address of Zookeeper server
+   * @return ProfileConfig object stored in zookeeper
+   * @throws IOException if error occurs during zookeeper read
+   */
+  private static ProfilerConfig handleProfileDefinitionsZK(final CommandLine commandLine) throws IOException  {
+    Preconditions.checkArgument(PROFILE_ZK.has(commandLine));
+    ProfilerConfig profiles;
+    final String zkQuorum = PROFILE_ZK.get(commandLine);
+    try (final CuratorFramework zkClient = createZKClient(zkQuorum)) {
+      profiles = readProfileFromZK(zkClient);
+    }
+    return profiles;
+  }
+
+  /**
+   * Reads profile information utilizing the passed zookeeper client
+   * @param zkClient Started zookeeper client
+   * @throws IOException if error occurs while reading profile information from zookeeper
+   */
+  static ProfilerConfig readProfileFromZK(CuratorFramework zkClient) throws IOException {
+    ProfilerConfig profiles;
+    try {
+      LOG.info("Loading profiles from zookeeper");
+      profiles = ConfigurationsUtils.readProfilerConfigFromZookeeper(zkClient);
+      LOG.info("Loaded {} profile(s)", profiles.getProfiles().size());
+    } catch (Exception ex) {
+      throw new IOException(
+              String.format("Error reading configuration from Zookeeper client %s",
+                      zkClient.toString()),
+              ex);
+    }
+    return profiles;
+  }
+
+  /**
    * Parse the command line arguments submitted by the user.
    * @param args The command line arguments to parse.
-   * @throws org.apache.commons.cli.ParseException
+   * @throws org.apache.commons.cli.ParseException if command line has errors
    */
   private static CommandLine parseCommandLine(String[] args) throws ParseException {
     CommandLineParser parser = new PosixParser();
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
index d58728a..5d66d3b 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
@@ -17,6 +17,7 @@
  *  limitations under the License.
  *
  */
+
 package org.apache.metron.profiler.spark.cli;
 
 import com.google.common.base.Joiner;
@@ -34,10 +35,22 @@ import java.util.function.Supplier;
  * Profiler.
  */
 public enum BatchProfilerCLIOptions {
+  PROFILE_ZK(() -> {
+    Option o = new Option("z", "zookeeper", true, "Zookeeper quorum for profile definitions");
+    o.setRequired(false);
+    return o;
+  }),
+
+  PROFILE_TIMESTAMP_FLD(() -> {
+    Option o = new Option("t", "timestampfield", true,
+            "The name of a field to source event time from");
+    o.setRequired(false);
+    return o;
+  }),
 
   PROFILE_DEFN_FILE(() -> {
     Option o = new Option("p", "profiles", true, "Path to the profile definitions.");
-    o.setRequired(true);
+    o.setRequired(false);
     return o;
   }),
 
diff --git a/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh b/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
index c489af7..33d9aa4 100644
--- a/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
+++ b/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
@@ -21,12 +21,26 @@ METRON_HOME=/usr/metron/${METRON_VERSION}
 PROFILER_JAR=${METRON_HOME}/lib/${project.artifactId}-${METRON_VERSION}.jar
 MAIN_CLASS=org.apache.metron.profiler.spark.cli.BatchProfilerCLI
 PROFILER_PROPS=${PROFILER_PROPS:-"${METRON_HOME}/config/batch-profiler.properties"}
-PROFILES_FILE=${PROFILES:-"${METRON_HOME}/config/zookeeper/profiler.json"}
 SPARK_HOME=${SPARK_HOME:-"/usr/hdp/current/spark2-client"}
 
+PROFILES_FILE=${PROFILES:-"${METRON_HOME}/config/zookeeper/profiler.json"}
+ZOOKEEPER_LOCATION=${ZOOKEEPER:-"node1:2181"}
+
+# allow for an override on event time source via environment variable
+if [ -n "$SPARK_PROFILER_EVENT_TIMESTAMP_FIELD" ]; then
+  EVENT_TIMESTAMP="--timestampfield ${SPARK_PROFILER_EVENT_TIMESTAMP_FIELD}"
+fi
+
+if [ -n "$SPARK_PROFILER_USE_ZOOKEEPER" ]; then
+  PROFILES_LOCATION="--zookeeper ${ZOOKEEPER_LOCATION}"
+else
+  PROFILES_LOCATION="--profiles ${PROFILES_FILE}"
+fi
+
 ${SPARK_HOME}/bin/spark-submit \
     --class ${MAIN_CLASS} \
     --properties-file ${PROFILER_PROPS} \
     ${PROFILER_JAR} \
     --config ${PROFILER_PROPS} \
-    --profiles ${PROFILES_FILE}
+    ${PROFILES_LOCATION} ${EVENT_TIMESTAMP} \
+    "$@"
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java
index c27495e..5be195a 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java
@@ -19,6 +19,7 @@
 
 package org.apache.metron.profiler.spark.cli;
 
+import org.apache.commons.cli.MissingArgumentException;
 import org.apache.commons.cli.MissingOptionException;
 import org.junit.Test;
 
@@ -40,7 +41,8 @@ public class BatchProfilerCLITest {
   }
 
   /**
-   * The user must define the -p, --profiles option.  The Profiler cannot work without profiles.
+   * The user must define the -p, --profiles, -z, --zookeeper options.
+   * The Profiler cannot work without profiles.
    */
   @Test(expected = MissingOptionException.class)
   public void mustDefineProfilesOption() throws Exception {
@@ -49,6 +51,30 @@ public class BatchProfilerCLITest {
   }
 
   /**
+   * The user must define one of  -p, --profiles, -z, --zookeeper options.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void mustDefineOnlyOneProfilesOption() throws Exception {
+    String[] args = new String[] {
+            "--profiles", "src/test/resources/profiles-no-timestamp-field.json",
+            "--zookeeper", "node1:2181"
+    };
+    BatchProfilerCLI.main(args);
+  }
+
+  /**
+   * If a timestamp option is given, it must contain a field name
+   */
+  @Test(expected = MissingArgumentException.class)
+  public void mustDefineFieldnametoGoWithTimestamp() throws Exception {
+    String[] args = new String[] {
+            "--timestampfield"
+    };
+    BatchProfilerCLI.main(args);
+  }
+
+
+  /**
    * If the profile definition contains no valid profiles, we have a problem.
    */
   @Test(expected = IllegalArgumentException.class)
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java
new file mode 100644
index 0000000..d77e775
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.cli;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.integration.TestZKServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class BatchProfilerZKIntegrationTest {
+  /**
+   * {
+   *   "profiles": [
+   *      {
+   *        "profile": "profile1",
+   *        "foreach": "ip_src_addr",
+   *        "init":   { "count": "0" },
+   *        "update": { "count": "count + 1" },
+   *        "result":   "count"
+   *      }
+   *   ]
+   * }
+   */
+  @Multiline
+  private String profile;
+
+  @Test
+  public void testProfilerZookeeperIntegration() throws Exception {
+    final byte[] profileExpectedByte = profile.getBytes(StandardCharsets.UTF_8);
+    final ProfilerConfig expectedProfileConfig = ProfilerConfig.fromBytes(profileExpectedByte);
+
+    TestZKServer.runWithZK( (zkServer, zkClient) -> {
+      // write bytes to zookeeper
+      ConfigurationsUtils.writeProfilerConfigToZookeeper(profileExpectedByte, zkClient);
+
+      // read bytes from zookeeper utilizing Batch Profiler functions
+      final ProfilerConfig profiles = BatchProfilerCLI.readProfileFromZK(zkClient);
+
+      // compare expected values
+      Assert.assertEquals("Profile read from zookeeper has changes", expectedProfileConfig, profiles);
+    });
+  }
+
+  @Test
+  public void testProfileZookeeperIntegrationFails() throws Exception {
+    final byte[] profileExpectedByte = profile.getBytes(StandardCharsets.UTF_8);
+    final ProfilerConfig expectedProfileConfig = ProfilerConfig.fromBytes(profileExpectedByte);
+    expectedProfileConfig.setTimestampField("foobar");
+
+    TestZKServer.runWithZK( (zkServer, zkClient) -> {
+      // write bytes to zookeeper
+      ConfigurationsUtils.writeProfilerConfigToZookeeper(profileExpectedByte, zkClient);
+
+      // read bytes from zookeeper utilizing Batch Profiler functions
+      final ProfilerConfig profiles = BatchProfilerCLI.readProfileFromZK(zkClient);
+
+      // compare expected values
+      Assert.assertNotEquals("Profile zookeeper integration test fails to detect change", expectedProfileConfig, profiles);
+    });
+  }
+}
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java
new file mode 100644
index 0000000..9ce02e4
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.integration;
+
+import java.io.Closeable;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.stellar.common.configuration.ConfigurationsUtils;
+
+
+/**
+ * Closable wrapper around ZKServerComponent so it can be cleanly used in a resource protection block
+ */
+public class TestZKServer implements Closeable {
+  private ZKServerComponent testZkServer;
+  private String zookeeperUrl;
+
+  /**
+   * BiConsuming interface that allows Exceptions to be thrown
+   */
+  @FunctionalInterface
+  public interface ThrowingBiConsumer<T, U> {
+    void accept(T t, U u) throws Exception;
+  }
+
+  /**
+   * Utility method to allow lambdas to automatically be fed a started Zookeeper client and server
+   * which are automatically cleaned up after the lambda finishes running or throws an exception
+   * @param testFunc  Lambda containing the code to run with the zookeeper client/server
+   * @throws Exception Any exceptions thrown by the 'testFunc' lambda will bubble up the call chain
+   */
+  static public void runWithZK(ThrowingBiConsumer<TestZKServer,CuratorFramework> testFunc) throws Exception {
+    try (TestZKServer zkServer = new TestZKServer();
+         CuratorFramework zkClient = zkServer.newClient()) {
+      zkClient.start();
+      testFunc.accept(zkServer, zkClient);
+    }
+  }
+
+  public TestZKServer() throws UnableToStartException {
+    testZkServer = new ZKServerComponent();
+    testZkServer.start();
+    zookeeperUrl = testZkServer.getConnectionString();
+  }
+
+  public String getZookeeperUrl() {
+    return zookeeperUrl;
+  }
+
+  /**
+   * Create a new zookeeper client configured to use our test Zookeeper server
+   * @return CuratorFramework client
+   */
+  public CuratorFramework newClient() {
+    return ConfigurationsUtils.getClient(zookeeperUrl);
+  }
+
+  @Override
+  public void close() {
+    testZkServer.stop();
+    testZkServer.reset();
+  }
+}