You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/08/17 15:34:53 UTC
[38/51] [abbrv] metron git commit: METRON-1728: Handle null values in
config in Pcap backend more gracefully (mmiklavc via mmiklavc) closes
apache/metron#1151
METRON-1728: Handle null values in config in Pcap backend more gracefully (mmiklavc via mmiklavc) closes apache/metron#1151
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9064cca0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9064cca0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9064cca0
Branch: refs/heads/master
Commit: 9064cca0317881176471c51abd16e99bf2ad7b10
Parents: 14dcb2d
Author: mmiklavc <mi...@gmail.com>
Authored: Thu Aug 9 09:25:29 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Thu Aug 9 09:25:29 2018 -0600
----------------------------------------------------------------------
.../common/configuration/ConfigOption.java | 32 ++++--
.../common/configuration/ConfigOptionTest.java | 112 +++++++++++++++++++
.../org/apache/metron/pcap/query/CliParser.java | 25 +++--
.../org/apache/metron/pcap/PcapJobTest.java | 23 ++++
.../apache/metron/pcap/query/PcapCliTest.java | 10 +-
.../metron/pcap/config/PcapGlobalDefaults.java | 28 +++++
.../metron/pcap/finalizer/PcapFinalizer.java | 8 +-
.../pcap/finalizer/PcapRestFinalizer.java | 11 +-
.../java/org/apache/metron/pcap/mr/PcapJob.java | 25 +++--
9 files changed, 237 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
index 8e4211b..6308f0a 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
@@ -18,36 +18,54 @@
package org.apache.metron.common.configuration;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-
import java.util.Map;
import java.util.function.BiFunction;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
public interface ConfigOption {
+
String getKey();
+
default BiFunction<String, Object, Object> transform() {
- return (s,o) -> o;
+ return (s, o) -> o;
}
default void put(Map<String, Object> map, Object value) {
map.put(getKey(), value);
}
+ default <T> T getOrDefault(Map<String, Object> map, Class<T> clazz, T defaultValue) {
+ T val;
+ return ((val = get(map, clazz)) == null ? defaultValue : val);
+ }
+
default <T> T get(Map<String, Object> map, Class<T> clazz) {
Object obj = map.get(getKey());
- if(clazz.isInstance(obj)) {
+ if (clazz.isInstance(obj)) {
return clazz.cast(obj);
- }
- else {
+ } else {
return ConversionUtils.convert(obj, clazz);
}
}
- default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform, Class<T> clazz) {
+ default <T> T getOrDefault(Map<String, Object> map, BiFunction<String, Object, T> transform,
+ Class<T> clazz, T defaultValue) {
+ T val;
+ return ((val = get(map, transform, clazz)) == null ? defaultValue : val);
+ }
+
+ default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform,
+ Class<T> clazz) {
return clazz.cast(transform.apply(getKey(), map.get(getKey())));
}
+ default <T> T getTransformedOrDefault(Map<String, Object> map, Class<T> clazz, T defaultValue) {
+ T val;
+ return ((val = getTransformed(map, clazz)) == null ? defaultValue : val);
+ }
+
default <T> T getTransformed(Map<String, Object> map, Class<T> clazz) {
return clazz.cast(transform().apply(getKey(), map.get(getKey())));
}
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
new file mode 100644
index 0000000..95db080
--- /dev/null
+++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/ConfigOptionTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.configuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the default interface methods
+ */
+public class ConfigOptionTest {
+
+ @Before
+ public void setup() {
+ }
+
+ @Test
+ public void gets_value_of_specified_type() {
+ ConfigOption option = newOption("foo");
+ Map<String, Object> config = new HashMap<>();
+ option.put(config, 25L);
+ assertThat(option.get(config, Long.class), equalTo(25L));
+ assertThat(option.get(mapWith("foo", 25L), Long.class), equalTo(25L));
+ }
+
+ @Test
+ public void gets_value_of_specified_type_with_transform() {
+ ConfigOption option = newOption("foo");
+ Map<String, Object> config = new HashMap<>();
+ option.put(config, "25");
+ BiFunction<String, Object, Long> transform = (s, o) -> o == null ? null
+ : new Long(o.toString());
+ assertThat(option.get(config, transform, Long.class), equalTo(25L));
+ assertThat(option.get(mapWith("foo", "25"), transform, Long.class), equalTo(25L));
+ }
+
+ @Test
+ public void gets_default_value_of_specified_type_with_transform() {
+ ConfigOption option = newOption("foo");
+ Map<String, Object> config = new HashMap<>();
+ option.put(config, null);
+ BiFunction<String, Object, Long> transform = (s, o) -> o == null ? null
+ : new Long(o.toString());
+ assertThat(option.getOrDefault(config, transform, Long.class, 25L), equalTo(25L));
+ assertThat(option.getOrDefault(mapWith("foo", null), transform, Long.class, 25L), equalTo(25L));
+ }
+
+ @Test
+ public void gets_default_when_null_value() {
+ ConfigOption option = newOption("foo");
+ Map<String, Object> config = new HashMap<>();
+ option.put(config, null);
+ assertThat(option.getOrDefault(config, Long.class, 0L), equalTo(0L));
+ assertThat(option.getOrDefault(mapWith("foo", null), Long.class, 0L), equalTo(0L));
+ }
+
+ @Test
+ public void gets_object_transformed_by_class_cast() {
+ ConfigOption option = newOption("foo");
+ Map<String, Object> config = new HashMap<>();
+ option.put(config, (Object) 25L);
+ assertThat(option.getTransformed(config, Long.class), equalTo(25L));
+ assertThat(option.getTransformed(mapWith("foo", (Object) 25L), Long.class), equalTo(25L));
+ }
+
+ @Test
+ public void gets_default_null_with_cast_when_null() {
+ ConfigOption option = newOption("foo");
+ Map<String, Object> config = new HashMap<>();
+ option.put(config, null);
+ assertThat(option.getTransformedOrDefault(config, Long.class, 25L), equalTo(25L));
+ assertThat(option.getTransformedOrDefault(mapWith("foo", null), Long.class, 25L), equalTo(25L));
+ }
+
+ private <K, V> Map<K, V> mapWith(K key, V val) {
+ Map<K, V> map = new HashMap<>();
+ map.put(key, val);
+ return map;
+ }
+
+ private ConfigOption newOption(final String key) {
+ return new ConfigOption() {
+ @Override
+ public String getKey() {
+ return key;
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index 4ad6ffa..2d15e8b 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -18,17 +18,23 @@
package org.apache.metron.pcap.query;
-import org.apache.commons.cli.*;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.metron.pcap.config.PcapConfig;
/**
* Provides commmon required fields for the PCAP filter jobs
*/
public class CliParser {
- public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap/input";
- public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp";
- public static final int NUM_REDUCERS_DEFAULT = 10;
- public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
private CommandLineParser parser;
protected PcapConfig.PrefixStrategy prefixStrategy;
@@ -40,9 +46,10 @@ public class CliParser {
public Options buildOptions() {
Options options = new Options();
options.addOption(newOption("h", "help", false, "Display help"));
- options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT)));
+ options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'",
+ BASE_INPUT_PATH_DEFAULT)));
options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'",
- BASE_INTERIM_OUTPUT_PATH_DEFAULT)));
+ BASE_INTERIM_RESULT_PATH_DEFAULT)));
options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
@@ -71,12 +78,12 @@ public class CliParser {
if (commandLine.hasOption("base_path")) {
config.setBasePath(commandLine.getOptionValue("base_path"));
} else {
- config.setBasePath(BASE_PATH_DEFAULT);
+ config.setBasePath(BASE_INPUT_PATH_DEFAULT);
}
if (commandLine.hasOption("base_output_path")) {
config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path"));
} else {
- config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+ config.setBaseInterimResultPath(BASE_INTERIM_RESULT_PATH_DEFAULT);
}
if (commandLine.hasOption("start_time")) {
try {
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 14963fd..796c8a5 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -264,4 +264,27 @@ public class PcapJobTest {
Assert.assertThat(status.getState(), equalTo(State.KILLED));
}
+ @Test
+ public void handles_null_values_with_defaults() throws Exception {
+ PcapOptions.START_TIME_NS.put(config, null);
+ PcapOptions.END_TIME_NS.put(config, null);
+ PcapOptions.NUM_REDUCERS.put(config, null);
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, null);
+
+ pageableResult = new PcapPages(
+ Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
+ when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+ when(mrJob.isComplete()).thenReturn(true);
+ when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ timer.updateJobStatus();
+ Pageable<Path> results = statusable.get();
+ Assert.assertThat(results.getSize(), equalTo(3));
+ JobStatus status = statusable.getStatus();
+ Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
+ Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
+ Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index c7d6fdf..96ca354 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.metron.pcap.query;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.BASE_INPUT_PATH_DEFAULT;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.argThat;
@@ -91,8 +93,8 @@ public class PcapCliTest {
put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "`casey`");
}};
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
- PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
- PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+ PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT);
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT);
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, 500L);
@@ -237,8 +239,8 @@ public class PcapCliTest {
String query = "some query string";
FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
- PcapOptions.BASE_PATH.put(config, CliParser.BASE_PATH_DEFAULT);
- PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, CliParser.BASE_INTERIM_OUTPUT_PATH_DEFAULT);
+ PcapOptions.BASE_PATH.put(config, BASE_INPUT_PATH_DEFAULT);
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, BASE_INTERIM_RESULT_PATH_DEFAULT);
PcapOptions.FIELDS.put(config, query);
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, 500L);
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
new file mode 100644
index 0000000..b8c674c
--- /dev/null
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapGlobalDefaults.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.pcap.config;
+
+public class PcapGlobalDefaults {
+ public static final String BASE_PCAP_PATH_DEFAULT = "/apps/metron/pcap";
+ public static final String BASE_INPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/input";
+ public static final String BASE_INTERIM_RESULT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/interim";
+ public static final String FINAL_OUTPUT_PATH_DEFAULT = BASE_PCAP_PATH_DEFAULT + "/output";
+ public static final int NUM_REDUCERS_DEFAULT = 10;
+ public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
index 8dcc401..5a61f9b 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -18,6 +18,8 @@
package org.apache.metron.pcap.finalizer;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_RECORDS_PER_FILE_DEFAULT;
+
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -62,9 +64,9 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
@Override
public Pageable<Path> finalizeJob(Map<String, Object> config) throws JobException {
Configuration hadoopConfig = PcapOptions.HADOOP_CONF.get(config, Configuration.class);
- int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(config, Integer.class);
- Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH
- .get(config, PcapOptions.STRING_TO_PATH, Path.class);
+ int recPerFile = PcapOptions.NUM_RECORDS_PER_FILE
+ .getOrDefault(config, Integer.class, NUM_RECORDS_PER_FILE_DEFAULT);
+ Path interimResultPath = PcapOptions.INTERIM_RESULT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class);
FileSystem fs = PcapOptions.FILESYSTEM.get(config, FileSystem.class);
SequenceFileIterable interimResults = null;
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
index 93a3222..13fa795 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
@@ -18,14 +18,15 @@
package org.apache.metron.pcap.finalizer;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.FINAL_OUTPUT_PATH_DEFAULT;
+
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.metron.job.Statusable;
import org.apache.metron.pcap.config.PcapOptions;
-
-import java.util.Map;
import org.apache.metron.pcap.writer.PcapResultsWriter;
/**
@@ -45,10 +46,12 @@ public class PcapRestFinalizer extends PcapFinalizer {
@Override
protected Path getOutputPath(Map<String, Object> config, int partition) {
- String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class);
+ String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH
+ .getOrDefault(config, String.class, FINAL_OUTPUT_PATH_DEFAULT);
String user = PcapOptions.USERNAME.get(config, String.class);
String jobId = PcapOptions.JOB_ID.get(config, String.class);
- return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition));
+ return new Path(
+ String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/9064cca0/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index ea2aa29..23bd510 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -20,6 +20,7 @@ package org.apache.metron.pcap.mr;
import static org.apache.metron.pcap.PcapHelper.greaterThanOrEqualTo;
import static org.apache.metron.pcap.PcapHelper.lessThanOrEqualTo;
+import static org.apache.metron.pcap.config.PcapGlobalDefaults.NUM_REDUCERS_DEFAULT;
import com.google.common.base.Joiner;
import java.io.IOException;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.metron.common.utils.timestamp.TimestampConverters;
import org.apache.metron.job.Finalizer;
import org.apache.metron.job.JobException;
import org.apache.metron.job.JobStatus;
@@ -60,6 +62,7 @@ import org.apache.metron.job.Statusable;
import org.apache.metron.pcap.PacketInfo;
import org.apache.metron.pcap.PcapHelper;
import org.apache.metron.pcap.PcapPages;
+import org.apache.metron.pcap.config.PcapGlobalDefaults;
import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.filter.PcapFilter;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
@@ -216,20 +219,22 @@ public class PcapJob<T> implements Statusable<Path> {
Configuration hadoopConf = PcapOptions.HADOOP_CONF.get(configuration, Configuration.class);
FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class);
Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class);
- Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class);
- long startTime;
+ Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH
+ .getTransformedOrDefault(configuration, Path.class,
+ new Path(PcapGlobalDefaults.BASE_INTERIM_RESULT_PATH_DEFAULT));
+ long startTimeNs;
if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) {
- startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
+ startTimeNs = PcapOptions.START_TIME_NS.getOrDefault(configuration, Long.class, 0L);
} else {
- startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000;
+ startTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.START_TIME_MS.getOrDefault(configuration, Long.class, 0L));
}
- long endTime;
+ long endTimeNs;
if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) {
- endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+ endTimeNs = PcapOptions.END_TIME_NS.getOrDefault(configuration, Long.class, TimestampConverters.MILLISECONDS.toNanoseconds(System.currentTimeMillis()));
} else {
- endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000;
+ endTimeNs = TimestampConverters.MILLISECONDS.toNanoseconds(PcapOptions.END_TIME_MS.getOrDefault(configuration, Long.class, System.currentTimeMillis()));
}
- int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class);
+ int numReducers = PcapOptions.NUM_REDUCERS.getOrDefault(configuration, Integer.class, NUM_REDUCERS_DEFAULT);
T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class);
PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
@@ -237,8 +242,8 @@ public class PcapJob<T> implements Statusable<Path> {
Statusable<Path> statusable = query(jobName,
basePath,
baseInterimResultPath,
- startTime,
- endTime,
+ startTimeNs,
+ endTimeNs,
numReducers,
fields,
// create a new copy for each job, bad things happen when hadoop config is reused