You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/08/17 15:34:21 UTC

[06/51] [abbrv] metron git commit: METRON-1674 Create REST endpoint for job status abstraction (merrimanr) closes apache/metron#1109

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
index 059bba2..95907df 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapRestFinalizer.java
@@ -18,27 +18,27 @@
 
 package org.apache.metron.pcap.finalizer;
 
-import java.util.Map;
 import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Statusable;
 import org.apache.metron.pcap.config.PcapOptions;
 
+import java.util.Map;
+
 /**
  * Write to HDFS.
  */
 public class PcapRestFinalizer extends PcapFinalizer {
 
-  /**
-   * Format will have the format <output-path>/page-<page-num>.pcap
-   * The filename prefix is pluggable, but in most cases it will be provided via the PcapConfig
-   * as a formatted timestamp + uuid. A final sample format will look as follows:
-   * /base/output/path/pcap-data-201807181911-09855b4ae3204dee8b63760d65198da3+0001.pcap
-   */
-  private static final String PCAP_CLI_FILENAME_FORMAT = "%s/page-%s.pcap";
+  private static final String PCAP_REST_FILEPATH_FORMAT = "%s/%s/%s/%s/page-%s.pcap";
+
+  private String jobType = Statusable.JobType.MAP_REDUCE.name();
 
   @Override
-  protected String getOutputFileName(Map<String, Object> config, int partition) {
-    Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.getTransformed(config, Path.class);
-    return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, partition);
+  protected Path getOutputPath(Map<String, Object> config, int partition) {
+    String finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, String.class);
+    String user = PcapOptions.USERNAME.get(config, String.class);
+    String jobId = PcapOptions.JOB_ID.get(config, String.class);
+    return new Path(String.format(PCAP_REST_FILEPATH_FORMAT, finalOutputPath, user, jobType, jobId, partition));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
index 05c494b..1dd670d 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
@@ -215,14 +215,24 @@ public class PcapJob<T> implements Statusable<Path> {
     FileSystem fileSystem = PcapOptions.FILESYSTEM.get(configuration, FileSystem.class);
     Path basePath = PcapOptions.BASE_PATH.getTransformed(configuration, Path.class);
     Path baseInterimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.getTransformed(configuration, Path.class);
-    long startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
-    long endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+    long startTime;
+    if (configuration.containsKey(PcapOptions.START_TIME_NS.getKey())) {
+      startTime = PcapOptions.START_TIME_NS.get(configuration, Long.class);
+    } else {
+      startTime = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000;
+    }
+    long endTime;
+    if (configuration.containsKey(PcapOptions.END_TIME_NS.getKey())) {
+      endTime = PcapOptions.END_TIME_NS.get(configuration, Long.class);
+    } else {
+      endTime = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000;
+    }
     int numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class);
     T fields = (T) PcapOptions.FIELDS.get(configuration, Object.class);
     PcapFilterConfigurator<T> filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
 
     try {
-      return query(jobName,
+      Statusable<Path> statusable = query(jobName,
           basePath,
           baseInterimResultPath,
           startTime,
@@ -233,6 +243,8 @@ public class PcapJob<T> implements Statusable<Path> {
           new Configuration(hadoopConf),
           fileSystem,
           filterImpl);
+      PcapOptions.JOB_ID.put(configuration, statusable.getStatus().getJobId());
+      return statusable;
     } catch (IOException | InterruptedException | ClassNotFoundException e) {
       throw new JobException("Failed to run pcap query.", e);
     }