You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/08/31 19:20:29 UTC
[33/50] [abbrv] metron git commit: METRON-1725 Add ability to specify
YARN queue for pcap jobs (merrimanr) closes apache/metron#1153
METRON-1725 Add ability to specify YARN queue for pcap jobs (merrimanr) closes apache/metron#1153
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7a8c2467
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7a8c2467
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7a8c2467
Branch: refs/remotes/apache/feature/METRON-1699-create-batch-profiler
Commit: 7a8c246748a2c9e8c5b9230800b075dd99a7f3a4
Parents: 73dc63e
Author: merrimanr <me...@gmail.com>
Authored: Fri Aug 10 16:46:31 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Fri Aug 10 16:46:31 2018 -0500
----------------------------------------------------------------------
.../CURRENT/configuration/metron-rest-env.xml | 9 ++++++
.../package/scripts/params/params_linux.py | 1 +
.../METRON/CURRENT/package/templates/metron.j2 | 1 +
.../METRON/CURRENT/themes/metron_theme.json | 10 ++++++
metron-interface/metron-rest/README.md | 2 ++
.../src/main/config/rest_application.yml | 1 +
.../apache/metron/rest/MetronRestConstants.java | 1 +
.../metron/rest/config/PcapJobSupplier.java | 2 +-
.../rest/service/impl/PcapServiceImpl.java | 12 ++++++-
.../apache/metron/rest/mock/MockPcapJob.java | 8 +++++
.../rest/service/impl/PcapServiceImplTest.java | 7 ++++-
metron-platform/metron-pcap-backend/README.md | 2 ++
.../org/apache/metron/pcap/query/CliParser.java | 4 +++
.../org/apache/metron/pcap/query/PcapCli.java | 3 ++
.../apache/metron/pcap/query/PcapCliTest.java | 33 ++++++++++++++++++--
.../apache/metron/pcap/config/PcapConfig.java | 10 ++++++
16 files changed, 101 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
index 20f9767..895c091 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
@@ -192,5 +192,14 @@
<description>The number of pcaps written to a page/file as a result of a pcap query.</description>
<value>10</value>
</property>
+ <property>
+ <name>pcap_yarn_queue</name>
+ <display-name>Pcap YARN Queue</display-name>
+ <description>The YARN queue pcap jobs will be submitted to.</description>
+ <value/>
+ <value-attributes>
+ <empty-value-valid>true</empty-value-valid>
+ </value-attributes>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 73d3469..4f8a9a7 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -390,6 +390,7 @@ pcap_base_path = config['configurations']['metron-rest-env']['pcap_base_path']
pcap_base_interim_result_path = config['configurations']['metron-rest-env']['pcap_base_interim_result_path']
pcap_final_output_path = config['configurations']['metron-rest-env']['pcap_final_output_path']
pcap_page_size = config['configurations']['metron-rest-env']['pcap_page_size']
+pcap_yarn_queue = config['configurations']['metron-rest-env']['pcap_yarn_queue']
pcap_configured_flag_file = status_params.pcap_configured_flag_file
# MapReduce
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
index 278d6f8..55422d0 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
@@ -44,3 +44,4 @@ PCAP_BASE_PATH="{{pcap_base_path}}"
PCAP_BASE_INTERIM_RESULT_PATH="{{pcap_base_interim_result_path}}"
PCAP_FINAL_OUTPUT_PATH="{{pcap_final_output_path}}"
PCAP_PAGE_SIZE="{{pcap_page_size}}"
+PCAP_YARN_QUEUE="{{pcap_yarn_queue}}"
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 9f5b04e..db06b61 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -824,6 +824,10 @@
"subsection-name": "subsection-rest"
},
{
+ "config": "metron-rest-env/pcap_yarn_queue",
+ "subsection-name": "subsection-rest"
+ },
+ {
"config": "metron-management-ui-env/metron_management_ui_port",
"subsection-name": "subsection-management-ui"
},
@@ -1431,6 +1435,12 @@
}
},
{
+ "config": "metron-rest-env/pcap_yarn_queue",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
"config": "metron-management-ui-env/metron_management_ui_port",
"widget": {
"type": "text-field"
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md
index 489cd9f..d19d8c3 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -221,6 +221,8 @@ The REST application uses a Java Process object to call out to the `pcap_to_pdml
Out of the box it is a simple wrapper around the tshark command to transform raw pcap data to PDML. However it can be extended to do additional processing as long as the expected input/output is maintained.
REST will supply the script with raw pcap data through standard in and expects PDML data serialized as XML.
+Pcap query jobs can be configured for submission to a YARN queue. This setting is exposed as the Spring property `pcap.yarn.queue`. If configured, the REST application will set the `mapreduce.job.queuename` Hadoop property to that value.
+
## API
Request and Response objects are JSON formatted. The JSON schemas are available in the Swagger UI.
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/config/rest_application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml b/metron-interface/metron-rest/src/main/config/rest_application.yml
index 7486112..e25ad82 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -60,3 +60,4 @@ pcap:
base.interim.result.path: ${PCAP_BASE_INTERIM_RESULT_PATH}
final.output.path: ${PCAP_FINAL_OUTPUT_PATH}
page.size: ${PCAP_PAGE_SIZE}
+ yarn.queue: ${PCAP_YARN_QUEUE}
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index d38aedb..02655298 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -81,4 +81,5 @@ public class MetronRestConstants {
public static final String PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY = "pcap.final.output.path";
public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size";
public static final String PCAP_PDML_SCRIPT_PATH_SPRING_PROPERTY = "pcap.pdml.script.path";
+ public static final String PCAP_YARN_QUEUE_SPRING_PROPERTY = "pcap.yarn.queue";
}
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
index 1e79f6a..538e41a 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
@@ -39,7 +39,7 @@ public class PcapJobSupplier implements Supplier<Statusable<Path>> {
PcapJob<Path> pcapJob = createPcapJob();
return pcapJob.submit(PcapFinalizerStrategies.REST, pcapRequest);
} catch (JobException e) {
- throw new RuntimeJobException(e.getMessage());
+ throw new RuntimeJobException(e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
index ae3f807..db2e17b 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.job.JobException;
import org.apache.metron.job.JobNotFoundException;
import org.apache.metron.job.JobStatus;
@@ -57,6 +58,8 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
+import static org.apache.metron.rest.MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY;
+
@Service
public class PcapServiceImpl implements PcapService {
@@ -250,7 +253,14 @@ public class PcapServiceImpl implements PcapService {
protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException {
PcapOptions.JOB_NAME.put(pcapRequest, "jobName");
PcapOptions.USERNAME.put(pcapRequest, username);
- PcapOptions.HADOOP_CONF.put(pcapRequest, configuration);
+ Configuration hadoopConf = new Configuration(configuration);
+ if (environment.containsProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY)) {
+ String queue = environment.getProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY);
+ if (queue != null && !queue.isEmpty()) {
+ hadoopConf.set(MRJobConfig.QUEUE_NAME, environment.getProperty(PCAP_YARN_QUEUE_SPRING_PROPERTY));
+ }
+ }
+ PcapOptions.HADOOP_CONF.put(pcapRequest, hadoopConf);
PcapOptions.FILESYSTEM.put(pcapRequest, getFileSystem());
if (pcapRequest.getBasePath() == null) {
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
index 779589d..c977faa 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
@@ -17,7 +17,9 @@
*/
package org.apache.metron.rest.mock;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.job.Finalizer;
import org.apache.metron.job.JobException;
import org.apache.metron.job.JobStatus;
@@ -45,6 +47,7 @@ public class MockPcapJob extends PcapJob<Path> {
private PcapFilterConfigurator filterImpl;
private int recPerFile;
private String query;
+ private String yarnQueue;
private Statusable<Path> statusable;
public MockPcapJob() {
@@ -68,6 +71,7 @@ public class MockPcapJob extends PcapJob<Path> {
}
this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class);
+ this.yarnQueue = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class).get(MRJobConfig.QUEUE_NAME);
return statusable;
}
@@ -144,4 +148,8 @@ public class MockPcapJob extends PcapJob<Path> {
public int getRecPerFile() {
return recPerFile;
}
+
+ public String getYarnQueue() {
+ return yarnQueue;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
index d539c71..6635598 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.job.JobException;
@@ -186,7 +187,7 @@ public class PcapServiceImplTest {
@Before
public void setUp() throws Exception {
environment = mock(Environment.class);
- configuration = mock(Configuration.class);
+ configuration = new Configuration();
mockPcapJobSupplier = new MockPcapJobSupplier();
pcapToPdmlScriptWrapper = new PcapToPdmlScriptWrapper();
@@ -200,6 +201,9 @@ public class PcapServiceImplTest {
@Test
public void submitShouldProperlySubmitFixedPcapRequest() throws Exception {
+ when(environment.containsProperty(MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY)).thenReturn(true);
+ when(environment.getProperty(MetronRestConstants.PCAP_YARN_QUEUE_SPRING_PROPERTY)).thenReturn("pcap");
+
FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
fixedPcapRequest.setBasePath("basePath");
fixedPcapRequest.setBaseInterimResultPath("baseOutputPath");
@@ -250,6 +254,7 @@ public class PcapServiceImplTest {
Assert.assertEquals(2000000, mockPcapJob.getEndTimeNs());
Assert.assertEquals(2, mockPcapJob.getNumReducers());
Assert.assertEquals(100, mockPcapJob.getRecPerFile());
+ Assert.assertEquals("pcap", mockPcapJob.getYarnQueue());
Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/README.md b/metron-platform/metron-pcap-backend/README.md
index 2ff20d8..031328d 100644
--- a/metron-platform/metron-pcap-backend/README.md
+++ b/metron-platform/metron-pcap-backend/README.md
@@ -139,6 +139,7 @@ usage: Fixed filter options
-sa,--ip_src_addr <arg> Source IP address
-sp,--ip_src_port <arg> Source port
-st,--start_time <arg> (required) Packet start time range.
+ -yq,--yarn_queue <arg> Yarn queue this job will be submitted to
```
```
@@ -158,6 +159,7 @@ usage: Query filter options
-ps,--print_status Print the status of the job as it runs
-q,--query <arg> Query string to use as a filter
-st,--start_time <arg> (required) Packet start time range.
+ -yq,--yarn_queue <arg> Yarn queue this job will be submitted to
```
The Query filter's `--query` argument specifies the Stellar expression to
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index 69c725c..5040f90 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -56,6 +56,7 @@ public class CliParser {
options.addOption(newOption("et", "end_time", true, "Packet end time range. Default is current system time."));
options.addOption(newOption("df", "date_format", true, "Date format to use for parsing start_time and end_time. Default is to use time in millis since the epoch."));
options.addOption(newOption("ps", "print_status", false, "Print the status of the job as it runs"));
+ options.addOption(newOption("yq", "yarn_queue", true, "Yarn queue this job will be submitted to"));
return options;
}
@@ -129,6 +130,9 @@ public class CliParser {
if (commandLine.hasOption("print_status")) {
config.setPrintJobStatus(true);
}
+ if (commandLine.hasOption("yarn_queue")) {
+ config.setYarnQueue(commandLine.getOptionValue("yarn_queue"));
+ }
}
public void printHelp(String msg, Options opts) {
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index c23f037..eebf366 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.job.JobException;
@@ -99,6 +100,7 @@ public class PcapCli {
return 0;
}
PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
+ config.getYarnQueue().ifPresent(s -> hadoopConf.set(MRJobConfig.QUEUE_NAME, s));
PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
@@ -124,6 +126,7 @@ public class PcapCli {
return 0;
}
PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
+ config.getYarnQueue().ifPresent(s -> hadoopConf.set(MRJobConfig.QUEUE_NAME, s));
PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 7c75224..a71e997 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -33,10 +33,18 @@ import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.metron.common.Constants;
import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
@@ -114,7 +122,24 @@ public class PcapCliTest {
return new TypeSafeMatcher<Map<K, V>>() {
@Override
protected boolean matchesSafely(Map<K, V> item) {
- return item.entrySet().containsAll(map.entrySet());
+ for(K key: map.keySet()) {
+ if (key.equals(PcapOptions.HADOOP_CONF.getKey())) {
+ Configuration itemConfiguration = (Configuration) item.get(PcapOptions.HADOOP_CONF.getKey());
+ Map<String, Object> mapConfiguration = (Map<String, Object>) map.get(PcapOptions.HADOOP_CONF.getKey());
+ for(String setting: mapConfiguration.keySet()) {
+ if (!mapConfiguration.get(setting).equals(itemConfiguration.get(setting, ""))) {
+ return false;
+ }
+ }
+ } else {
+ V itemValue = item.get(key);
+ V mapValue = map.get(key);
+ if (itemValue != null ? !itemValue.equals(mapValue) : mapValue != null) {
+ return false;
+ }
+ }
+ }
+ return true;
}
@Override
@@ -192,7 +217,8 @@ public class PcapCliTest {
"-include_reverse",
"-num_reducers", "10",
"-records_per_file", "1000",
- "-ps"
+ "-ps",
+ "-yq", "pcap"
};
Map<String, String> query = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
@@ -215,6 +241,9 @@ public class PcapCliTest {
PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config
PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
PcapOptions.PRINT_JOB_STATUS.put(config, true);
+ PcapOptions.HADOOP_CONF.put(config, new HashMap<String, Object>() {{
+ put(MRJobConfig.QUEUE_NAME, "pcap");
+ }});
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
http://git-wip-us.apache.org/repos/asf/metron/blob/7a8c2467/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
index cbb8170..4a08e14 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapConfig.java
@@ -25,6 +25,7 @@ import org.apache.metron.common.configuration.ConfigOption;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
+import java.util.Optional;
import java.util.function.Function;
public class PcapConfig extends AbstractMapDecorator<String, Object>{
@@ -32,6 +33,7 @@ public class PcapConfig extends AbstractMapDecorator<String, Object>{
private boolean showHelp;
private DateFormat dateFormat;
+ private String yarnQueue;
public PcapConfig() {
super(new HashMap<>());
@@ -137,4 +139,12 @@ public class PcapConfig extends AbstractMapDecorator<String, Object>{
public void setNumRecordsPerFile(int numRecordsPerFile) {
PcapOptions.NUM_RECORDS_PER_FILE.put(this, numRecordsPerFile);
}
+
+ public void setYarnQueue(String yarnQueue) {
+ this.yarnQueue = yarnQueue;
+ }
+
+ public Optional<String> getYarnQueue() {
+ return Optional.ofNullable(yarnQueue);
+ }
}