You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2019/05/21 00:06:43 UTC
[metron] branch master updated: METRON-1788 Batch profiler pull
profile information from zookeeper (tigerquoll via mmiklavc) closes
apache/metron#1383
This is an automated email from the ASF dual-hosted git repository.
mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 3b04460 METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383
3b04460 is described below
commit 3b0446006c327b7dea494b3a0bcbb9de4f662d5a
Author: tigerquoll <ti...@outlook.com>
AuthorDate: Mon May 20 18:05:50 2019 -0600
METRON-1788 Batch profiler pull profile information from zookeeper (tigerquoll via mmiklavc) closes apache/metron#1383
---
metron-analytics/metron-profiler-spark/README.md | 61 ++++++++--
metron-analytics/metron-profiler-spark/pom.xml | 6 +
.../profiler/spark/cli/BatchProfilerCLI.java | 126 +++++++++++++++++++--
.../spark/cli/BatchProfilerCLIOptions.java | 15 ++-
.../src/main/scripts/start_batch_profiler.sh | 18 ++-
.../profiler/spark/cli/BatchProfilerCLITest.java | 28 ++++-
.../spark/cli/BatchProfilerZKIntegrationTest.java | 82 ++++++++++++++
.../apache/metron/integration/TestZKServer.java | 79 +++++++++++++
8 files changed, 395 insertions(+), 20 deletions(-)
diff --git a/metron-analytics/metron-profiler-spark/README.md b/metron-analytics/metron-profiler-spark/README.md
index 5ee8510..8750550 100644
--- a/metron-analytics/metron-profiler-spark/README.md
+++ b/metron-analytics/metron-profiler-spark/README.md
@@ -42,8 +42,7 @@ The portion of a profile produced by the Batch Profiler should be indistinguisha
For an introduction to the Profiler, see the [Profiler README](../metron-profiler-common/README.md).
## Getting Started
-
-1. Create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows.
+1. If a profile file does not already exist, you can create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows.
```
cat $METRON_HOME/config/zookeeper/profiler.json
@@ -60,7 +59,8 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile
"timestampField": "timestamp"
}
```
-
+ See [Specifying profiles](#specifying-profiles) for information on how to load profile definitions from zookeeper.
+
1. Ensure that you have archived telemetry available for the Batch Profiler to consume. By default, Metron will store this in HDFS at `/apps/metron/indexing/indexed/*/*`.
```
@@ -80,7 +80,6 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile
```
log4j.logger.org.apache.metron.profiler.spark=DEBUG
```
-
1. Run the Batch Profiler.
```
@@ -91,6 +90,41 @@ For an introduction to the Profiler, see the [Profiler README](../metron-profile
1. Query for the profile data using the [Profiler Client](../metron-profiler-client/README.md).
+## Specifying profiles
+
+The profile to use for batch processing can be specified as either a JSON file on disk
+or by utilizing a profile already loaded into zookeeper for use by the streaming profiler.
+
+### Loading a profile from disk
+
+1. If a profile file does not already exist, you can create a profile definition by editing `$METRON_HOME/config/zookeeper/profiler.json` as follows.
+
+ ```
+ cat $METRON_HOME/config/zookeeper/profiler.json
+ {
+ "profiles": [
+ {
+ "profile": "hello-world",
+ "foreach": "'global'",
+ "init": { "count": "0" },
+ "update": { "count": "count + 1" },
+ "result": "count"
+ }
+ ],
+ "timestampField": "timestamp"
+ }
+ ```
+1. When launching the batch profiler directly, use the `--profiles <path to profiler.json>` option.
+If using the wrapper script to launch the batch profiler, it will automatically add the command argument
+`--profiles $METRON_HOME/config/zookeeper/profiler.json ` to the batch launching process if `$SPARK_PROFILER_USE_ZOOKEEPER` is not defined.
+
+### Loading a profile from zookeeper
+
+Choose to use profiles already loaded into zookeeper (e.g. for use by the streaming profiler) by setting the environment variable `$SPARK_PROFILER_USE_ZOOKEEPER`.
+This will cause the wrapper script to add `--zookeeper $ZOOKEEPER` to the batch launching process,
+which will cause the spark profiler to extract profiles from the zookeeper quorum located at `$ZOOKEEPER`.
+
+
## Installation
The Batch Profiler package is installed automatically when installing Metron using the Ambari MPack. See the following notes when installing the Batch Profiler without the Ambari MPack.
@@ -147,9 +181,11 @@ The Batch Profiler requires Spark version 2.3.0+.
A script located at `$METRON_HOME/bin/start_batch_profiler.sh` has been provided to simplify running the Batch Profiler. This script makes the following assumptions.
- * The script builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`.
-
+ * The script either
+ * builds the profiles defined in `$METRON_HOME/config/zookeeper/profiler.json`. or
+ * utilises the profiles already loaded into zookeeper quorum at `$ZOOKEEPER` if the environment variable `$SPARK_PROFILER_USE_ZOOKEEPER` is set.
* The properties defined in `$METRON_HOME/config/batch-profiler.properties` are passed to both the Profiler and Spark. You can define both Spark and Profiler properties in this same file.
+ * The script will also configure the event time field to use if the field value is stored in the `${SPARK_PROFILER_EVENT_TIMESTAMP_FIELD}` environment variable.
* The script assumes that Spark is installed at `/usr/hdp/current/spark2-client`. This can be overridden if you define an environment variable called `SPARK_HOME` prior to executing the script.
@@ -171,6 +207,8 @@ The Batch Profiler accepts the following arguments when run from the command lin
| Argument | Description
|--- |---
| [`-p`, `--profiles`](#--profiles) | Path to the profile definitions.
+| [`-z`, `--zookeeper`](#--zookeeper) | Zookeeper quorum to read profile definitions from.
+| [`-t`, `--timestampfield`](#--timestampfield) | Which data field to use for event time.
| [`-c`, `--config`](#--config) | Path to the profiler properties file.
| [`-g`, `--globals`](#--globals) | Path to the Stellar global config file.
| [`-r`, `--reader`](#--reader) | Path to properties for the DataFrameReader.
@@ -178,7 +216,16 @@ The Batch Profiler accepts the following arguments when run from the command lin
#### `--profiles`
-The path to a file containing the profile definition in JSON.
+The path to a file containing the profile definition in JSON. Only one of `--zookeeper` or `--profiles` should be used
+
+#### `--zookeeper`
+
+Read profile definitions from the zookeeper quorum at this address. Only one of `--zookeeper` or `--profiles` should be used.
+
+#### `--timestampfield`
+
+Specifies which data field to utilising for event time information. The field to use for event time is usually stored as part of the profile.
+It can be overridden via this setting.
#### `--config`
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index 40bd551..8b8d510 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -144,6 +144,12 @@
<version>${global_log4j_core_version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-integration-test</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
index 29fe4a2..9171f01 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLI.java
@@ -17,15 +17,22 @@
* limitations under the License.
*
*/
+
package org.apache.metron.profiler.spark.cli;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.profiler.spark.BatchProfiler;
+import org.apache.metron.zookeeper.ZKCache;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
@@ -35,19 +42,23 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
+import java.util.Optional;
import java.util.Properties;
-import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.GLOBALS_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILER_PROPS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_DEFN_FILE;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_TIMESTAMP_FLD;
+import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.PROFILE_ZK;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.READER_PROPS_FILE;
import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse;
/**
- * The main entry point which launches the Batch Profiler in Spark.
- *
+ * The main entry point which launches the Batch Profiler iin Spark.
+ * Profiles can be read from either files (utilising --profiles)
+ * or zookeeper (utilising --zookeeper)
* With this class the Batch Profiler can be submitted using the following command.
- *
+ * <p></p>
* <pre>{@code
* $SPARK_HOME/bin/spark-submit \
* --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \
@@ -58,6 +69,17 @@ import static org.apache.metron.profiler.spark.cli.BatchProfilerCLIOptions.parse
* --profiles profiles.json \
* --reader reader.properties
* }</pre>
+ * <p></p>
+ * Or to pull the profile information from zookeeper
+ * <pre>{@code
+ * $SPARK_HOME/bin/spark-submit \
+ * --class org.apache.metron.profiler.spark.cli.BatchProfilerCLI \
+ * --properties-file spark.properties \
+ * metron-profiler-spark-<version>.jar \
+ * --globals global.properties \
+ * --zookeeper ZookeeperQuorumForProfiles
+ * --reader reader.properties
+ * }</pre>
*/
public class BatchProfilerCLI implements Serializable {
@@ -68,17 +90,19 @@ public class BatchProfilerCLI implements Serializable {
public static Properties readerProps;
public static ProfilerConfig profiles;
- public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException {
+ public static void main(String[] args) throws IOException, org.apache.commons.cli.ParseException, Exception {
// parse the command line
CommandLine commandLine = parseCommandLine(args);
+
+ // read profile information
+ profiles = Preconditions.checkNotNull(handleProfileDefinitions(commandLine), "An error occurred while reading profile data");
profilerProps = handleProfilerProperties(commandLine);
globals = handleGlobals(commandLine);
- profiles = handleProfileDefinitions(commandLine);
readerProps = handleReaderProperties(commandLine);
// the batch profiler must use 'event time'
if(!profiles.getTimestampField().isPresent()) {
- throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined.");
+ throw new IllegalArgumentException("The Batch Profiler must use event time. The 'timestampField' must be defined in the profile definitions file or via the --timestampField argument.");
}
// one or more profiles must be defined
@@ -97,6 +121,54 @@ public class BatchProfilerCLI implements Serializable {
}
/**
+ * Extracts profile information from a file or from zookeeper
+ * @param commandLine Command line information.
+ * @return Profile information
+ * @throws MissingOptionException if command line options are missing
+ * @throws IOException If there are disk or network issues retrieving profiles
+ */
+ private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws MissingOptionException, IOException {
+ final String PROFILE_LOCATION_ERROR =
+ "A single profile location (--profiles or --zookeeper) must be specified";
+ ProfilerConfig profiles;
+
+ if ((!PROFILE_ZK.has(commandLine)) && (!PROFILE_DEFN_FILE.has(commandLine))) {
+ throw new MissingOptionException(PROFILE_LOCATION_ERROR);
+ }
+ if (PROFILE_ZK.has(commandLine) && PROFILE_DEFN_FILE.has(commandLine)) {
+ throw new IllegalArgumentException(PROFILE_LOCATION_ERROR);
+ }
+
+ if (PROFILE_ZK.has(commandLine)) {
+ profiles = handleProfileDefinitionsZK(commandLine);
+ } else {
+ profiles = handleProfileDefinitionsFile(commandLine);
+ }
+
+ // event time can specified via command line override
+ if (PROFILE_TIMESTAMP_FLD.has(commandLine)) {
+ final String timestampField = PROFILE_TIMESTAMP_FLD.get(commandLine);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(timestampField), "timestampField must be not be empty if specified");
+ profiles.setTimestampField(timestampField);
+ }
+ LOG.info("Utilising profile: {}", profiles.toString());
+ return profiles;
+ }
+
+ /**
+ * Loads Zookeeper client if one is configured.
+ * @param zkQuorum Address if zookeeper server
+ * @return CuratorFramework client if zookeeper configuration defined
+ */
+ private static CuratorFramework createZKClient(final String zkQuorum) {
+ LOG.info("Loading profiler properties from zookeeper quorum '{}'", zkQuorum);
+ final CuratorFramework zkClient = ZKCache.createClient(zkQuorum, Optional.empty());
+ zkClient.start();
+ LOG.info("Zookeeper client created successfully");
+ return zkClient;
+ }
+
+ /**
* Load the Stellar globals from a file.
*
* @param commandLine The command line.
@@ -155,7 +227,7 @@ public class BatchProfilerCLI implements Serializable {
*
* @param commandLine The command line.
*/
- private static ProfilerConfig handleProfileDefinitions(CommandLine commandLine) throws IOException {
+ private static ProfilerConfig handleProfileDefinitionsFile(CommandLine commandLine) throws IOException {
ProfilerConfig profiles;
if(PROFILE_DEFN_FILE.has(commandLine)) {
String profilePath = PROFILE_DEFN_FILE.get(commandLine);
@@ -173,9 +245,45 @@ public class BatchProfilerCLI implements Serializable {
}
/**
+ * Load the profile definitions from ZK server identified in command line
+ * @param commandLine Address of Zookeeper server
+ * @return ProfileConfig object stored in zookeeper
+ * @throws IOException if error occurs during zookeeper read
+ */
+ private static ProfilerConfig handleProfileDefinitionsZK(final CommandLine commandLine) throws IOException {
+ Preconditions.checkArgument(PROFILE_ZK.has(commandLine));
+ ProfilerConfig profiles;
+ final String zkQuorum = PROFILE_ZK.get(commandLine);
+ try (final CuratorFramework zkClient = createZKClient(zkQuorum)) {
+ profiles = readProfileFromZK(zkClient);
+ }
+ return profiles;
+ }
+
+ /**
+ * Reads profile information utilizing the passed zookeeper client
+ * @param zkClient Started zookeeper client
+ * @throws IOException if error occurs while reading profile information from zookeeper
+ */
+ static ProfilerConfig readProfileFromZK(CuratorFramework zkClient) throws IOException {
+ ProfilerConfig profiles;
+ try {
+ LOG.info("Loading profiles from zookeeper");
+ profiles = ConfigurationsUtils.readProfilerConfigFromZookeeper(zkClient);
+ LOG.info("Loaded {} profile(s)", profiles.getProfiles().size());
+ } catch (Exception ex) {
+ throw new IOException(
+ String.format("Error reading configuration from Zookeeper client %s",
+ zkClient.toString()),
+ ex);
+ }
+ return profiles;
+ }
+
+ /**
* Parse the command line arguments submitted by the user.
* @param args The command line arguments to parse.
- * @throws org.apache.commons.cli.ParseException
+ * @throws org.apache.commons.cli.ParseException if command line has errors
*/
private static CommandLine parseCommandLine(String[] args) throws ParseException {
CommandLineParser parser = new PosixParser();
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
index d58728a..5d66d3b 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLIOptions.java
@@ -17,6 +17,7 @@
* limitations under the License.
*
*/
+
package org.apache.metron.profiler.spark.cli;
import com.google.common.base.Joiner;
@@ -34,10 +35,22 @@ import java.util.function.Supplier;
* Profiler.
*/
public enum BatchProfilerCLIOptions {
+ PROFILE_ZK(() -> {
+ Option o = new Option("z", "zookeeper", true, "Zookeeper quorum for profile definitions");
+ o.setRequired(false);
+ return o;
+ }),
+
+ PROFILE_TIMESTAMP_FLD(() -> {
+ Option o = new Option("t", "timestampfield", true,
+ "The name of a field to source event time from");
+ o.setRequired(false);
+ return o;
+ }),
PROFILE_DEFN_FILE(() -> {
Option o = new Option("p", "profiles", true, "Path to the profile definitions.");
- o.setRequired(true);
+ o.setRequired(false);
return o;
}),
diff --git a/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh b/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
index c489af7..33d9aa4 100644
--- a/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
+++ b/metron-analytics/metron-profiler-spark/src/main/scripts/start_batch_profiler.sh
@@ -21,12 +21,26 @@ METRON_HOME=/usr/metron/${METRON_VERSION}
PROFILER_JAR=${METRON_HOME}/lib/${project.artifactId}-${METRON_VERSION}.jar
MAIN_CLASS=org.apache.metron.profiler.spark.cli.BatchProfilerCLI
PROFILER_PROPS=${PROFILER_PROPS:-"${METRON_HOME}/config/batch-profiler.properties"}
-PROFILES_FILE=${PROFILES:-"${METRON_HOME}/config/zookeeper/profiler.json"}
SPARK_HOME=${SPARK_HOME:-"/usr/hdp/current/spark2-client"}
+PROFILES_FILE=${PROFILES:-"${METRON_HOME}/config/zookeeper/profiler.json"}
+ZOOKEEPER_LOCATION=${ZOOKEEPER:-"node1:2181"}
+
+# allow for an override on event time source via environment variable
+if [ -n "$SPARK_PROFILER_EVENT_TIMESTAMP_FIELD" ]; then
+ EVENT_TIMESTAMP="--timestampfield ${SPARK_PROFILER_EVENT_TIMESTAMP_FIELD}"
+fi
+
+if [ -n "$SPARK_PROFILER_USE_ZOOKEEPER" ]; then
+ PROFILES_LOCATION="--zookeeper ${ZOOKEEPER_LOCATION}"
+else
+ PROFILES_LOCATION="--profiles ${PROFILES_FILE}"
+fi
+
${SPARK_HOME}/bin/spark-submit \
--class ${MAIN_CLASS} \
--properties-file ${PROFILER_PROPS} \
${PROFILER_JAR} \
--config ${PROFILER_PROPS} \
- --profiles ${PROFILES_FILE}
+ ${PROFILES_LOCATION} ${EVENT_TIMESTAMP} \
+ "$@"
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java
index c27495e..5be195a 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerCLITest.java
@@ -19,6 +19,7 @@
package org.apache.metron.profiler.spark.cli;
+import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.MissingOptionException;
import org.junit.Test;
@@ -40,7 +41,8 @@ public class BatchProfilerCLITest {
}
/**
- * The user must define the -p, --profiles option. The Profiler cannot work without profiles.
+ * The user must define the -p, --profiles, -z, --zookeeper options.
+ * The Profiler cannot work without profiles.
*/
@Test(expected = MissingOptionException.class)
public void mustDefineProfilesOption() throws Exception {
@@ -49,6 +51,30 @@ public class BatchProfilerCLITest {
}
/**
+ * The user must define one of -p, --profiles, -z, --zookeeper options.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void mustDefineOnlyOneProfilesOption() throws Exception {
+ String[] args = new String[] {
+ "--profiles", "src/test/resources/profiles-no-timestamp-field.json",
+ "--zookeeper", "node1:2181"
+ };
+ BatchProfilerCLI.main(args);
+ }
+
+ /**
+ * If a timestamp option is given, it must contain a field name
+ */
+ @Test(expected = MissingArgumentException.class)
+ public void mustDefineFieldnametoGoWithTimestamp() throws Exception {
+ String[] args = new String[] {
+ "--timestampfield"
+ };
+ BatchProfilerCLI.main(args);
+ }
+
+
+ /**
* If the profile definition contains no valid profiles, we have a problem.
*/
@Test(expected = IllegalArgumentException.class)
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java
new file mode 100644
index 0000000..d77e775
--- /dev/null
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/cli/BatchProfilerZKIntegrationTest.java
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.spark.cli;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
+import org.apache.metron.common.configuration.profiler.ProfilerConfig;
+import org.apache.metron.integration.TestZKServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+
+public class BatchProfilerZKIntegrationTest {
+ /**
+ * {
+ * "profiles": [
+ * {
+ * "profile": "profile1",
+ * "foreach": "ip_src_addr",
+ * "init": { "count": "0" },
+ * "update": { "count": "count + 1" },
+ * "result": "count"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ private String profile;
+
+ @Test
+ public void testProfilerZookeeperIntegration() throws Exception {
+ final byte[] profileExpectedByte = profile.getBytes(StandardCharsets.UTF_8);
+ final ProfilerConfig expectedProfileConfig = ProfilerConfig.fromBytes(profileExpectedByte);
+
+ TestZKServer.runWithZK( (zkServer, zkClient) -> {
+ // write bytes to zookeeper
+ ConfigurationsUtils.writeProfilerConfigToZookeeper(profileExpectedByte, zkClient);
+
+ // read bytes from zookeeper utilizing Batch Profiler functions
+ final ProfilerConfig profiles = BatchProfilerCLI.readProfileFromZK(zkClient);
+
+ // compare expected values
+ Assert.assertEquals("Profile read from zookeeper has changes", expectedProfileConfig, profiles);
+ });
+ }
+
+ @Test
+ public void testProfileZookeeperIntegrationFails() throws Exception {
+ final byte[] profileExpectedByte = profile.getBytes(StandardCharsets.UTF_8);
+ final ProfilerConfig expectedProfileConfig = ProfilerConfig.fromBytes(profileExpectedByte);
+ expectedProfileConfig.setTimestampField("foobar");
+
+ TestZKServer.runWithZK( (zkServer, zkClient) -> {
+ // write bytes to zookeeper
+ ConfigurationsUtils.writeProfilerConfigToZookeeper(profileExpectedByte, zkClient);
+
+ // read bytes from zookeeper utilizing Batch Profiler functions
+ final ProfilerConfig profiles = BatchProfilerCLI.readProfileFromZK(zkClient);
+
+ // compare expected values
+ Assert.assertNotEquals("Profile zookeeper integration test fails to detect change", expectedProfileConfig, profiles);
+ });
+ }
+}
diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java
new file mode 100644
index 0000000..9ce02e4
--- /dev/null
+++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/TestZKServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.integration;
+
+import java.io.Closeable;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.stellar.common.configuration.ConfigurationsUtils;
+
+
+/**
+ * Closable wrapper around ZKServerComponent so it can be cleanly used in a resource protection block
+ */
+public class TestZKServer implements Closeable {
+ private ZKServerComponent testZkServer;
+ private String zookeeperUrl;
+
+ /**
+ * BiConsuming interface that allows Exceptions to be thrown
+ */
+ @FunctionalInterface
+ public interface ThrowingBiConsumer<T, U> {
+ void accept(T t, U u) throws Exception;
+ }
+
+ /**
+ * Utility method to allow lambdas to automatically be fed a started Zookeeper client and server
+ * which are automatically cleaned up after the lambda finishes running or throws an exception
+ * @param testFunc Lambda containing the code to run with the zookeeper client/server
+ * @throws Exception Any exceptions thrown by the 'testFunc' lambda will bubble up the call chain
+ */
+ static public void runWithZK(ThrowingBiConsumer<TestZKServer,CuratorFramework> testFunc) throws Exception {
+ try (TestZKServer zkServer = new TestZKServer();
+ CuratorFramework zkClient = zkServer.newClient()) {
+ zkClient.start();
+ testFunc.accept(zkServer, zkClient);
+ }
+ }
+
+ public TestZKServer() throws UnableToStartException {
+ testZkServer = new ZKServerComponent();
+ testZkServer.start();
+ zookeeperUrl = testZkServer.getConnectionString();
+ }
+
+ public String getZookeeperUrl() {
+ return zookeeperUrl;
+ }
+
+ /**
+ * Create a new zookeeper client configured to use our test Zookeeper server
+ * @return CuratorFramework client
+ */
+ public CuratorFramework newClient() {
+ return ConfigurationsUtils.getClient(zookeeperUrl);
+ }
+
+ @Override
+ public void close() {
+ testZkServer.stop();
+ testZkServer.reset();
+ }
+}