You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/08/17 15:34:21 UTC
[06/51] [abbrv] metron git commit: METRON-1674 Create REST endpoint
for job status abstraction (merrimanr) closes apache/metron#1109
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
index 059bba2..95907df 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
@@ -18,27 +18,27 @@
package org.apache.metron.pcap.finalizer;
-import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Statusable;
import org.apache.metron.pcap.config.PcapOptions;
+import java.util.Map;
+
/**
* Write to HDFS.
*/
public class PcapRestFinalizer extends PcapFinalizer {
- /**
- * Format will have the format <output-path>/page-<page-num>.pcap
- * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig
- * as a formatted timestamp + uuid. A final sample format will look as follows:
- * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
- */
- private static final String PCAP_CLI_FILENAME_FORMAT = "%s/page-%s.pcap";
+ private static final String PCAP_REST_FILEPATH_FORMAT = "%s/%s/%s/%s/page-%s.pcap";
+
+ private String jobType = Statusable.JobType.MAP_REDUCE.name();
@Override
- protected String getOutputFileName(Map<String, Object> config, int partition) {
- Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.getTransformed(config, Path.class);
- return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, partition);
+ protected Path getOutputPath(Map<String, Object> config, int partition) {
+ String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class);
+ String user = PcapOptions.USERNAME.get(config, String.class);
+ String jobId = PcapOptions.JOB_ID.get(config, String.class);
+ return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 05c494b..1dd670d 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -215,14 +215,24 @@ public class PcapJob<T> implements Statusable<Path> {
FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class);
Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class);
Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class);
- long startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
- long endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+ long startTime;
+ if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) {
+ startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
+ } else {
+ startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000;
+ }
+ long endTime;
+ if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) {
+ endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+ } else {
+ endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000;
+ }
int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class);
T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class);
PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
try {
- return query(jobName,
+ Statusable<Path> statusable = query(jobName,
basePath,
baseInterimResultPath,
startTime,
@@ -233,6 +243,8 @@ public class PcapJob<T> implements Statusable<Path> {
new Configuration(hadoopConf),
fileSystem,
filterImpl);
+ PcapOptions.JOB_ID.put(configuration, statusable.getStatus().getJobId());
+ return statusable;
} catch (IOException | InterruptedException | ClassNotFoundException e) {
throw new JobException("Failed to run pcap query.", e);
}