You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/08/17 15:34:18 UTC

[03/51] [abbrv] metron git commit: METRON-1614: Create job status abstraction (mmiklavc via mmiklavc) closes apache/metron#1108

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/metron-job_state_statechart_diagram.svg
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.svg b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg
new file mode 100644
index 0000000..a99c5ad
--- /dev/null
+++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg
@@ -0,0 +1,14 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="661px" height="291px" version="1.1" style="background-color: rgb(255, 255, 255);"><defs/><g transform="translate(0.5,0.5)"><ellipse cx="15" cy="30" rx="11" ry="11" fill="#000000" stroke="#ff0000" transform="rotate(90,15,30)" pointer-events="none"/><rect x="110" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(123.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="92" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 92px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-dec
 oration:inherit;">NOT_RUNNING</div></div></foreignObject><text x="46" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">NOT_RUNNING</text></switch></g><path d="M 440 30 L 537.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 529.88 34.5 L 538.88 30 L 529.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(350.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="58" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 60px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://ww
 w.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">RUNNING</div></div></foreignObject><text x="29" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">RUNNING</text></switch></g><rect x="540" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(563.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="72" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 74px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FINALIZING</div></div></foreignObject><text x="36" y="12" fill="#0000
 00" text-anchor="middle" font-size="12px" font-family="Verdana">FINALIZING</text></switch></g><path d="M 30 30 L 107.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 99.88 34.5 L 108.88 30 L 99.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 230 30 L 317.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 309.88 34.5 L 318.88 30 L 309.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0,
  0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">KILLED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">KILLED</text></switch></g><rect x="320" y="230" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,253.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml"
  style="display:inline-block;text-align:inherit;text-decoration:inherit;">FAILED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">FAILED</text></switch></g><rect x="540" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(562.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="75" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 76px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">SUCCEEDED</div></div></foreignObject><text x="38" y="12" fill="#000000" text-anchor="mid
 dle" font-size="12px" font-family="Verdana">SUCCEEDED</text></switch></g><path d="M 380 60 L 380 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 375.5 109.88 L 380 118.88 L 384.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 600 60 L 600 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 595.5 109.88 L 600 118.88 L 604.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 250 Q 490 260 480 260 L 442.24 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 255.5 L 441.12 260 L 450.12 264.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 320 45 L 290 45 Q 280 45 280 55 L 280 250 Q 280 260 290 260 L 317.76 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path 
 d="M 309.88 264.5 L 318.88 260 L 309.88 255.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 140 Q 490 150 480 150 L 442.24 150" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 145.5 L 441.12 150 L 450.12 154.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/></g></svg>

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/metron-job_state_statechart_diagram.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.xml b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml
new file mode 100644
index 0000000..b9ee8aa
--- /dev/null
+++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml
@@ -0,0 +1,14 @@
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+<!-- This is a draw.io diagram.  You can load it from http://www.draw.io -->
+<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36" version="8.9.5" editor="www.draw.io" type="google"><diagram id="58cdce13-f638-feb5-8d6f-7d28b1aa9fa0" name="Page-1">7VrbctowEP0aHtvxTdg8BkJSpintlKZN+tJRbGFrIixGiAL9+kq2fAfiOhjIFF6wdqWVtOfsauVxxxzM1rcMzoNP1EOkY2jeumNedwzD0GxN/EnJJpb0eiAW+Ax7sUjPBBP8BymhGucvsYcWhY6cUsLxvCh0aRgilxdkkDG6KnabUlKcdQ59VBFMXEiq0h/Y40EsdYCWyT8g7AfJzLqmNE/QffYZXYZqvo5hTqNfrJ7BxJbqvwigR1c5kTnsmANGKY+fZusBItK3idvicTc7tOm6GQp5rQGOY1sAOK6NtCdNs98pC78hWaJkC9FC+SZxjrAgcBCNfsBnRMh08Sg2Mpf6BYeMTzjkUj/FhAwooSwaaGrRT3bmjD6jnGY6VZrIdcjLjMbekSqXzrCrngl8QqSf+jqxFNIwmpaG/AbOMJHs+46YB0OoxIpquqHa21YHCfZDIXOFD5FQ9j3MBMkwlcIFXUoI+8pNiHG03ul7PUVURAqiM8TZRnRRAxzFARUjaXuVMc5UoiBHtkQGFcf91HCGs3hQUNeE3ajAPv787dfX+/F4NL6tMKCI0irAHE3m0JXalUgIRWZA5iqvG1aVEzI23P2cOBMWHABxvVeCvAsqkOvGFsy7bWBuViPbE1lQNSnjAfVpCMkwk+ZxRWvMH3LPjxKV90C2QrG0BwVS1Mh0+zFDoXcl07ecf47CWKLQcvaSpCbaCaoETXmKqdx2AVER5MxFO9xmqfMI
 Mh/xHX1AbWYwRCDHv4vzHxRmqxLal7A+bFhb2jmFNajgfTMaX92Nfl4gPxzkXeOcIO++LpO//Zyc3BH25WTjjHKyfTl6m8Bs1IDZOjzMaugXioXFNAWYpWLOLBfm8VbUqPwd7AVDhlYyFO+3YihiXbqfZkR0KofFx9Hd3fD6clC0VBuYzikPil61NrgaXeBuD26rd0q4k9dpObwn94PBcHh9gby1UvC0Ea7rFWAblRaqnFDFhV4oLbJK47EWaG+htKhzq3eOVVoAs0QpCzQrLbovGWqxtNC3vDn+Jya+Gb7tjpkmTAQ1mJgk9vapaNslBoGmVLR729PkMaj42ledCRX1zpnfqlIqFm6GNmiRi71jUbFbuXA1pWLZUO+IVLQOlRX/MyrWOaCPR8XyuWo0vPuXT3pQ5vQOKgok4SbXbS47LHYv2OyWFqxre9dV7g8MUIqEeAWHiwtwSdGtpeijFa4nj4uXqSia2ecVcffsGxZz+Bc=</diagram></mxfile>

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java
new file mode 100644
index 0000000..5001cfa
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+import java.util.Map;
+
+/**
+ * Finalize a job.
+ *
+ * @param <PAGE_T> Type for the Pageable.
+ */
+public interface Finalizer<PAGE_T> {
+
+  /**
+   * Run any routines for finalizing a job.
+   *
+   * @param config options to be used by the finalization process.
+   * @return Pageable results.
+   */
+  Pageable<PAGE_T> finalizeJob(Map<String, Object> config) throws JobException;
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java
new file mode 100644
index 0000000..10096cd
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public class JobException extends Exception {
+
+  public JobException(String message) {
+    super(message);
+  }
+
+  public JobException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
index ec006fb..5a2f485 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
@@ -18,8 +18,6 @@
 
 package org.apache.metron.job;
 
-import org.apache.hadoop.fs.Path;
-
 /**
  * Capture metadata about a batch job.
  */
@@ -29,6 +27,7 @@ public class JobStatus {
     NOT_RUNNING,
     RUNNING,
     SUCCEEDED,
+    FINALIZING,
     FAILED,
     KILLED
   }
@@ -37,7 +36,7 @@ public class JobStatus {
   private State state = State.NOT_RUNNING;
   private double percentComplete = 0.0;
   private String description;
-  private Path resultPath;
+  private long completionTime;
 
   public JobStatus withJobId(String jobId) {
     this.jobId = jobId;
@@ -59,11 +58,15 @@ public class JobStatus {
     return this;
   }
 
-  public JobStatus withResultPath(Path resultPath) {
-    this.resultPath = resultPath;
+  public JobStatus withCompletionTime(long completionTime) {
+    this.completionTime = completionTime;
     return this;
   }
 
+  public String getJobId() {
+    return jobId;
+  }
+
   public State getState() {
     return state;
   }
@@ -76,8 +79,8 @@ public class JobStatus {
     return description;
   }
 
-  public Path getResultPath() {
-    return resultPath;
+  public long getCompletionTime() {
+    return completionTime;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
index 1038ab8..d93c7de 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
@@ -18,14 +18,7 @@
 
 package org.apache.metron.job;
 
-public interface Pageable<T> {
-
-  /**
-   * Transform into an Iterable.
-   *
-   * @return Iterable version of this Pageable.
-   */
-  Iterable<T> asIterable();
+public interface Pageable<T> extends Iterable<T> {
 
   /**
    * Provides access to a specific page of results in the result set.
@@ -35,4 +28,11 @@ public interface Pageable<T> {
    */
   T getPage(int num);
 
+  /**
+   * Number of pages i this Pageable.
+   *
+   * @return number of pages
+   */
+  int getSize();
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
index 7a8fc02..9bdea35 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
@@ -18,20 +18,44 @@
 
 package org.apache.metron.job;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
  * Abstraction for getting status on running jobs. Also provides options for killing and validating.
  */
-public interface Statusable {
+public interface Statusable<PAGE_T> {
+
+  enum JobType {
+    MAP_REDUCE;
+  }
+
+  /**
+   * Submit the job asynchronously.
+   *
+   * @return self
+   */
+  Statusable<PAGE_T> submit(Finalizer<PAGE_T> finalizer, Map<String, Object> configuration) throws JobException;
+
+  /**
+   * Synchronous call.
+   *
+   * @return pages of results
+   */
+  Pageable<PAGE_T> get() throws JobException, InterruptedException;
+
+  /**
+   * Execution framework type of this job.
+   *
+   * @return type of job
+   */
+  JobType getJobType();
 
   /**
    * Current job status.
    *
    * @return status
    */
-  JobStatus getStatus();
+  JobStatus getStatus() throws JobException;
 
   /**
    * Completion flag.
@@ -43,7 +67,7 @@ public interface Statusable {
   /**
    * Kill job.
    */
-  void kill() throws IOException;
+  void kill() throws JobException;
 
   /**
    * Validate job after submitted.

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
new file mode 100644
index 0000000..bf0baa7
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job.manager;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private Map<String, Map<String, Statusable<PAGE_T>>> jobs;
+
+  public InMemoryJobManager() {
+    this.jobs = Collections.synchronizedMap(new HashMap<>());
+  }
+
+  @Override
+  public JobStatus submit(Supplier<Statusable<PAGE_T>> jobSupplier, String username)
+      throws JobException {
+    Map<String, Statusable<PAGE_T>> userJobs = getUserJobs(username);
+    Statusable<PAGE_T> job = jobSupplier.get();
+    userJobs.put(job.getStatus().getJobId(), job);
+    jobs.put(username, userJobs);
+    return job.getStatus();
+  }
+
+  @Override
+  public JobStatus getStatus(String username, String jobId) throws JobException {
+    return jobs.get(username).get(jobId).getStatus();
+  }
+
+  @Override
+  public boolean done(String username, String jobId) throws JobException {
+    return getJob(username, jobId).isDone();
+  }
+
+  @Override
+  public void killJob(String username, String jobId) throws JobException {
+    getJob(username, jobId).kill();
+  }
+
+  @Override
+  public Statusable<PAGE_T> getJob(String username, String jobId) throws JobException {
+    return getUserJobs(username).get(jobId);
+  }
+
+  private Map<String, Statusable<PAGE_T>> getUserJobs(String username) {
+    return jobs.getOrDefault(username, Collections.synchronizedMap(new HashMap<>()));
+  }
+
+  @Override
+  public List<Statusable<PAGE_T>> getJobs(String username) throws JobException {
+    return new ArrayList<Statusable<PAGE_T>>(getUserJobs(username).values());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java
new file mode 100644
index 0000000..eff60e5
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job.manager;
+
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
+
+public interface JobManager<PAGE_T> {
+
+  JobStatus submit(Supplier<Statusable<PAGE_T>> jobSupplier, String username) throws JobException;
+
+  JobStatus getStatus(String username, String jobId) throws JobException;
+
+  boolean done(String username, String jobId) throws JobException;
+
+  void killJob(String username, String jobId) throws JobException;
+
+  Statusable<PAGE_T> getJob(String username, String jobId) throws JobException;
+
+  List<Statusable<PAGE_T>> getJobs(String username) throws JobException;
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java
new file mode 100644
index 0000000..f3a3978
--- /dev/null
+++ b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job.manager;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.job.Statusable.JobType;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class InMemoryJobManagerTest {
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+  @Mock
+  private Statusable<Path> job1;
+  @Mock
+  private Statusable<Path> job2;
+  @Mock
+  private Statusable<Path> job3;
+  @Mock
+  private Finalizer<Path> finalizer;
+  @Mock
+  private Pageable<Path> results;
+  private JobManager<Path> jm;
+  private Map<String, Object> config;
+  private String username1;
+  private String username2;
+  private String jobId1;
+  private String jobId2;
+  private String jobId3;
+  private String basePath;
+
+  @Before
+  public void setup() throws JobException {
+    MockitoAnnotations.initMocks(this);
+    jm = new InMemoryJobManager<Path>();
+    config = new HashMap<>();
+    username1 = "user123";
+    username2 = "user456";
+    jobId1 = "job_abc_123";
+    jobId2 = "job_def_456";
+    jobId3 = "job_ghi_789";
+    basePath = tempDir.getRoot().getAbsolutePath();
+    when(job1.getJobType()).thenReturn(JobType.MAP_REDUCE);
+    when(job2.getJobType()).thenReturn(JobType.MAP_REDUCE);
+    when(job3.getJobType()).thenReturn(JobType.MAP_REDUCE);
+    when(job1.submit(finalizer, config)).thenReturn(job1);
+    when(job2.submit(finalizer, config)).thenReturn(job2);
+    when(job3.submit(finalizer, config)).thenReturn(job3);
+    when(finalizer.finalizeJob(any())).thenReturn(results);
+  }
+
+  @Test
+  public void submits_job_and_returns_status() throws JobException {
+    when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1));
+    JobStatus status = jm.submit(newSupplier(job1), username1);
+    assertThat(status.getState(), equalTo(State.RUNNING));
+    assertThat(status.getJobId(), equalTo(jobId1));
+    when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1));
+    status = jm.getStatus(username1, status.getJobId());
+    assertThat(status.getState(), equalTo(State.SUCCEEDED));
+    assertThat(status.getJobId(), equalTo(jobId1));
+  }
+
+  @Test
+  public void submits_multiple_jobs_and_returns_status() throws JobException {
+    when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1));
+    when(job2.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId2));
+    when(job3.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId3));
+
+    // user has 1 job
+    jm.submit(newSupplier(job1), username1);
+    assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+
+    // user has 2 jobs
+    jm.submit(newSupplier(job2), username1);
+    assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+    assertThat(jm.getJob(username1, jobId2), equalTo(job2));
+
+    // user has 3 jobs
+    jm.submit(newSupplier(job3), username1);
+    assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+    assertThat(jm.getJob(username1, jobId2), equalTo(job2));
+    assertThat(jm.getJob(username1, jobId3), equalTo(job3));
+
+    // multiple users have 3 jobs
+    jm.submit(newSupplier(job1), username2);
+    jm.submit(newSupplier(job2), username2);
+    jm.submit(newSupplier(job3), username2);
+    // user 1 still good
+    assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+    assertThat(jm.getJob(username1, jobId2), equalTo(job2));
+    assertThat(jm.getJob(username1, jobId3), equalTo(job3));
+    // and also user 2
+    assertThat(jm.getJob(username2, jobId1), equalTo(job1));
+    assertThat(jm.getJob(username2, jobId2), equalTo(job2));
+    assertThat(jm.getJob(username2, jobId3), equalTo(job3));
+  }
+
+  @Test
+  public void returns_job_status() throws JobException {
+    JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1);
+    when(job1.getStatus()).thenReturn(expected);
+    jm.submit(newSupplier(job1), username1);
+    JobStatus status = jm.getStatus(username1, jobId1);
+    assertThat(status, equalTo(expected));
+  }
+
+  @Test
+  public void returns_job_is_done() throws JobException {
+    JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1);
+    when(job1.getStatus()).thenReturn(expected);
+    when(job1.isDone()).thenReturn(true);
+    jm.submit(newSupplier(job1), username1);
+    boolean done = jm.done(username1, jobId1);
+    assertThat(done, equalTo(true));
+  }
+
+  @Test
+  public void kills_job() throws JobException {
+    when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1));
+    jm.submit(newSupplier(job1), username1);
+    jm.killJob(username1, jobId1);
+    verify(job1).kill();
+  }
+
+  @Test
+  public void gets_list_of_user_jobs() throws JobException {
+    when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1));
+    when(job2.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId2));
+    when(job3.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId3));
+    jm.submit(newSupplier(job1), username1);
+    jm.submit(newSupplier(job2), username1);
+    jm.submit(newSupplier(job3), username1);
+    jm.submit(newSupplier(job1), username2);
+    jm.submit(newSupplier(job2), username2);
+    jm.submit(newSupplier(job3), username2);
+    List<Statusable<Path>> jobsUser1 = jm.getJobs(username1);
+    List<Statusable<Path>> jobsUser2 = jm.getJobs(username2);
+    assertThat("Wrong size", jobsUser1.size(), equalTo(3));
+    assertThat("Wrong size", jobsUser2.size(), equalTo(3));
+    assertThat("", jobsUser1.containsAll(Arrays.asList(job1, job2, job3)), equalTo(true));
+    assertThat("", jobsUser2.containsAll(Arrays.asList(job1, job2, job3)), equalTo(true));
+  }
+
+  private Supplier<Statusable<Path>> newSupplier(Statusable<Path> job) {
+    return () -> {
+      try {
+        return job.submit(finalizer, config);
+      } catch (JobException e) {
+        throw new RuntimeException("Something went wrong", e);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index c3b1a69..fb523ee 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -218,6 +218,12 @@
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
deleted file mode 100644
index 1d8e3f3..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.system.Clock;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.UUID;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-public class CliConfig {
-  public interface PrefixStrategy extends Function<Clock, String>{}
-
-  private boolean showHelp;
-  private String prefix;
-  private String basePath;
-  private String baseOutputPath;
-  private long startTime;
-  private long endTime;
-  private int numReducers;
-  private int numRecordsPerFile;
-  private DateFormat dateFormat;
-
-
-  public CliConfig(PrefixStrategy prefixStrategy) {
-    showHelp = false;
-    basePath = "";
-    baseOutputPath = "";
-    startTime = -1L;
-    endTime = -1L;
-    numReducers = 0;
-    prefix = prefixStrategy.apply(new Clock());
-  }
-
-  public String getPrefix() {
-    return prefix;
-  }
-
-  public void setPrefix(String prefix) {
-    this.prefix = prefix;
-  }
-
-  public int getNumReducers() {
-    return numReducers;
-  }
-
-  public boolean showHelp() {
-    return showHelp;
-  }
-
-  public void setShowHelp(boolean showHelp) {
-    this.showHelp = showHelp;
-  }
-
-  public String getBasePath() {
-    return basePath;
-  }
-
-  public String getBaseOutputPath() {
-    return baseOutputPath;
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public long getEndTime() {
-    return endTime;
-  }
-
-  public void setBasePath(String basePath) {
-    this.basePath = basePath;
-  }
-
-  public void setBaseOutputPath(String baseOutputPath) {
-    this.baseOutputPath = baseOutputPath;
-  }
-
-  public void setStartTime(long startTime) {
-    this.startTime = startTime;
-  }
-
-  public void setEndTime(long endTime) {
-    this.endTime = endTime;
-  }
-
-  public boolean isNullOrEmpty(String val) {
-    return StringUtils.isEmpty(val);
-  }
-
-  public void setDateFormat(String dateFormat) {
-    this.dateFormat = new SimpleDateFormat(dateFormat);
-  }
-
-  public DateFormat getDateFormat() {
-    return dateFormat;
-  }
-
-  public void setNumReducers(int numReducers) {
-    this.numReducers = numReducers;
-  }
-
-  public int getNumRecordsPerFile() {
-    return numRecordsPerFile;
-  }
-
-  public void setNumRecordsPerFile(int numRecordsPerFile) {
-    this.numRecordsPerFile = numRecordsPerFile;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index d5976ae..e6534c5 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -19,19 +19,20 @@
 package org.apache.metron.pcap.query;
 
 import org.apache.commons.cli.*;
+import org.apache.metron.pcap.config.PcapConfig;
 
 /**
  * Provides commmon required fields for the PCAP filter jobs
  */
 public class CliParser {
   public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
-  public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
+  public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp";
   public static final int NUM_REDUCERS_DEFAULT = 10;
   public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
   private CommandLineParser parser;
-  protected CliConfig.PrefixStrategy prefixStrategy;
+  protected PcapConfig.PrefixStrategy prefixStrategy;
 
-  public CliParser(CliConfig.PrefixStrategy prefixStrategy) {
+  public CliParser(PcapConfig.PrefixStrategy prefixStrategy) {
     this.prefixStrategy = prefixStrategy;
     parser = new PosixParser();
   }
@@ -40,7 +41,8 @@ public class CliParser {
     Options options = new Options();
     options.addOption(newOption("h", "help", false, "Display help"));
     options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT)));
-    options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT)));
+    options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'",
+        BASE_INTERIM_OUTPUT_PATH_DEFAULT)));
     options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
     options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
     options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
@@ -59,7 +61,7 @@ public class CliParser {
     return option;
   }
 
-  public void parse(CommandLine commandLine, CliConfig config) throws java.text.ParseException {
+  public void parse(CommandLine commandLine, PcapConfig config) throws java.text.ParseException {
     if (commandLine.hasOption("help")) {
       config.setShowHelp(true);
     }
@@ -72,18 +74,18 @@ public class CliParser {
       config.setBasePath(BASE_PATH_DEFAULT);
     }
     if (commandLine.hasOption("base_output_path")) {
-      config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
+      config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path"));
     } else {
-      config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT);
+      config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT);
     }
     if (commandLine.hasOption("start_time")) {
       try {
         if (commandLine.hasOption("date_format")) {
           long startTime = config.getDateFormat().parse(commandLine.getOptionValue("start_time")).getTime();
-          config.setStartTime(startTime);
+          config.setStartTimeMs(startTime);
         } else {
           long startTime = Long.parseLong(commandLine.getOptionValue("start_time"));
-          config.setStartTime(startTime);
+          config.setStartTimeMs(startTime);
         }
       } catch (NumberFormatException nfe) {
         //no-op
@@ -107,10 +109,10 @@ public class CliParser {
       try {
         if (commandLine.hasOption("date_format")) {
           long endTime = config.getDateFormat().parse(commandLine.getOptionValue("end_time")).getTime();
-          config.setEndTime(endTime);
+          config.setEndTimeMs(endTime);
         } else {
           long endTime = Long.parseLong(commandLine.getOptionValue("end_time"));
-          config.setEndTime(endTime);
+          config.setEndTimeMs(endTime);
         }
       } catch (NumberFormatException nfe) {
         //no-op

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
deleted file mode 100644
index 03caed7..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-import org.apache.metron.common.Constants;
-
-import java.util.EnumMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class FixedCliConfig extends CliConfig {
-
-  private Map<String, String> fixedFields;
-
-  public FixedCliConfig(PrefixStrategy prefixStrategy) {
-    super(prefixStrategy);
-    this.fixedFields = new LinkedHashMap<>();
-  }
-
-  public Map<String, String> getFixedFields() {
-    return fixedFields;
-  }
-
-  public void setFixedFields(Map<String, String> fixedFields) {
-    this.fixedFields = fixedFields;
-  }
-
-  public void putFixedField(String key, String value) {
-    String trimmedVal = value != null ? value.trim() : null;
-    if (!isNullOrEmpty(trimmedVal)) {
-      this.fixedFields.put(key, value);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
index 4e1bfcf..19d351c 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
@@ -22,11 +22,13 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.metron.common.Constants;
 import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapConfig;
 
 public class FixedCliParser extends CliParser {
   private Options fixedOptions;
 
-  public FixedCliParser(CliConfig.PrefixStrategy prefixStrategy) {
+  public FixedCliParser(PcapConfig.PrefixStrategy prefixStrategy) {
     super(prefixStrategy);
     fixedOptions = buildFixedOptions();
   }
@@ -51,9 +53,9 @@ public class FixedCliParser extends CliParser {
    * @return Configuration tailored to fixed pcap queries
    * @throws ParseException
    */
-  public FixedCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+  public FixedPcapConfig parse(String[] args) throws ParseException, java.text.ParseException {
     CommandLine commandLine = getParser().parse(fixedOptions, args);
-    FixedCliConfig config = new FixedCliConfig(prefixStrategy);
+    FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
     super.parse(commandLine, config);
     config.putFixedField(Constants.Fields.SRC_ADDR.getName(), commandLine.getOptionValue("ip_src_addr"));
     config.putFixedField(Constants.Fields.DST_ADDR.getName(), commandLine.getOptionValue("ip_dst_addr"));
@@ -63,7 +65,7 @@ public class FixedCliParser extends CliParser {
     config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), Boolean.toString(commandLine.hasOption("include_reverse")));
     config.putFixedField(PcapHelper.PacketFields.PACKET_FILTER.getName(), commandLine.getOptionValue("packet_filter"));
     if(commandLine.hasOption("prefix")) {
-      config.setPrefix(commandLine.getOptionValue("prefix"));
+      config.setFinalFilenamePrefix(commandLine.getOptionValue("prefix"));
     }
     return config;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index 0fda801..3462921 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -27,45 +27,46 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
-import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapConfig;
+import org.apache.metron.pcap.config.QueryPcapConfig;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies;
 import org.apache.metron.pcap.mr.PcapJob;
-import org.apache.metron.pcap.writer.ResultsWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PcapCli {
   private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static final CliConfig.PrefixStrategy PREFIX_STRATEGY = clock -> {
-    String timestamp = new Clock().currentTimeFormatted("yyyyMMddHHmm");
+  public static final PcapConfig.PrefixStrategy PREFIX_STRATEGY = clock -> {
+    String timestamp = clock.currentTimeFormatted("yyyyMMddHHmm");
     String uuid = UUID.randomUUID().toString().replaceAll("-", "");
     return String.format("%s-%s", timestamp, uuid);
   };
   private final PcapJob jobRunner;
-  private final ResultsWriter resultsWriter;
-  private final CliConfig.PrefixStrategy prefixStrategy;
+  private final PcapConfig.PrefixStrategy prefixStrategy;
 
   public static void main(String[] args) {
-    int status = new PcapCli(new PcapJob(), new ResultsWriter(), PREFIX_STRATEGY).run(args);
+    int status = new PcapCli(new PcapJob(), PREFIX_STRATEGY).run(args);
     System.exit(status);
   }
 
-  public PcapCli(PcapJob jobRunner, ResultsWriter resultsWriter, CliConfig.PrefixStrategy prefixStrategy) {
+  public PcapCli(PcapJob jobRunner, PcapConfig.PrefixStrategy prefixStrategy) {
     this.jobRunner = jobRunner;
-    this.resultsWriter = resultsWriter;
     this.prefixStrategy = prefixStrategy;
   }
 
+
   public int run(String[] args) {
     if (args.length < 1) {
       printBasicHelp();
       return -1;
     }
     String jobType = args[0];
-    SequenceFileIterable results = null;
     String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
     Configuration hadoopConf = new Configuration();
     String[] otherArgs = null;
@@ -75,13 +76,18 @@ public class PcapCli {
       LOGGER.error("Failed to configure hadoop with provided options: {}", e.getMessage(), e);
       return -1;
     }
-    CliConfig commonConfig = null;
+    PcapConfig commonConfig = null;
+    Pageable<Path> results;
+    // write to local FS in the executing directory
+    String execDir = System.getProperty("user.dir");
+
     if ("fixed".equals(jobType)) {
       FixedCliParser fixedParser = new FixedCliParser(prefixStrategy);
-      FixedCliConfig config = null;
+      FixedPcapConfig config = null;
       try {
         config = fixedParser.parse(otherArgs);
         commonConfig = config;
+        PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path("file:///" + execDir));
       } catch (ParseException | java.text.ParseException e) {
         System.err.println(e.getMessage());
         System.err.flush();
@@ -92,28 +98,24 @@ public class PcapCli {
         fixedParser.printHelp();
         return 0;
       }
-      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
       long startTime = time.getLeft();
       long endTime = time.getRight();
 
+      PcapOptions.START_TIME_NS.put(commonConfig, startTime);
+      PcapOptions.END_TIME_NS.put(commonConfig, endTime);
+      PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
+      PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
       try {
-        results = jobRunner.query(
-                new Path(config.getBasePath()),
-                new Path(config.getBaseOutputPath()),
-                startTime,
-                endTime,
-                config.getNumReducers(),
-                config.getFixedFields(),
-                hadoopConf,
-                FileSystem.get(hadoopConf),
-                new FixedPcapFilter.Configurator());
-      } catch (IOException | ClassNotFoundException | InterruptedException e) {
+        PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
+        results = jobRunner.submit(PcapFinalizerStrategies.CLI, commonConfig).get();
+      } catch (IOException|InterruptedException | JobException e) {
         LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e);
         return -1;
       }
     } else if ("query".equals(jobType)) {
       QueryCliParser queryParser = new QueryCliParser(prefixStrategy);
-      QueryCliConfig config = null;
+      QueryPcapConfig config = null;
       try {
         config = queryParser.parse(otherArgs);
         commonConfig = config;
@@ -126,23 +128,19 @@ public class PcapCli {
         queryParser.printHelp();
         return 0;
       }
-      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+      Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
       long startTime = time.getLeft();
       long endTime = time.getRight();
 
+      PcapOptions.START_TIME_NS.put(commonConfig, startTime);
+      PcapOptions.END_TIME_NS.put(commonConfig, endTime);
+      PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
+      PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
       try {
-        results = jobRunner.query(
-                new Path(config.getBasePath()),
-                new Path(config.getBaseOutputPath()),
-                startTime,
-                endTime,
-                config.getNumReducers(),
-                config.getQuery(),
-                hadoopConf,
-                FileSystem.get(hadoopConf),
-                new QueryPcapFilter.Configurator());
-      } catch (IOException | ClassNotFoundException | InterruptedException e) {
-        LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e);
+        PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
+        results = jobRunner.submit(PcapFinalizerStrategies.CLI, commonConfig).get();
+      } catch (IOException| InterruptedException | JobException e) {
+        LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e);
         return -1;
       }
     } else {
@@ -150,17 +148,6 @@ public class PcapCli {
       return -1;
     }
 
-    try {
-      // write to local FS in the executing directory
-      String execDir = System.getProperty("user.dir");
-      jobRunner.writeResults(results, resultsWriter, new Path("file:///" + execDir),
-          commonConfig.getNumRecordsPerFile(),
-          commonConfig.getPrefix());
-    } catch (IOException e) {
-      LOGGER.error("Unable to write file", e);
-      return -1;
-    }
-
     return 0;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
deleted file mode 100644
index 67f045f..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-public class QueryCliConfig extends CliConfig {
-  private String query;
-
-  public QueryCliConfig(PrefixStrategy prefixStrategy) {
-    super(prefixStrategy);
-  }
-
-  public String getQuery() {
-    return query;
-  }
-
-  public void setQuery(String query) {
-    this.query = query;
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
index d6e5cd1..b4375d1 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
@@ -20,11 +20,13 @@ package org.apache.metron.pcap.query;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.metron.pcap.config.PcapConfig;
+import org.apache.metron.pcap.config.QueryPcapConfig;
 
 public class QueryCliParser extends CliParser {
   private Options queryOptions;
 
-  public QueryCliParser(CliConfig.PrefixStrategy prefixStrategy) {
+  public QueryCliParser(PcapConfig.PrefixStrategy prefixStrategy) {
     super(prefixStrategy);
     queryOptions = setupOptions();
   }
@@ -43,15 +45,15 @@ public class QueryCliParser extends CliParser {
    * @return Configuration tailored to query pcap queries
    * @throws ParseException
    */
-  public QueryCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+  public QueryPcapConfig parse(String[] args) throws ParseException, java.text.ParseException {
     CommandLine commandLine = getParser().parse(queryOptions, args);
-    QueryCliConfig config = new QueryCliConfig(prefixStrategy);
+    QueryPcapConfig config = new QueryPcapConfig(prefixStrategy);
     super.parse(commandLine, config);
     if (commandLine.hasOption("query")) {
       config.setQuery(commandLine.getOptionValue("query"));
     }
     if(commandLine.hasOption("prefix")) {
-      config.setPrefix(commandLine.getOptionValue("prefix"));
+      config.setFinalFilenamePrefix(commandLine.getOptionValue("prefix"));
     }
     return config;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 5a5d406..1e389d9 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -19,15 +19,12 @@
 package org.apache.metron.pcap;
 
 import static java.lang.Long.toUnsignedString;
-import static java.lang.String.format;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.startsWith;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -39,9 +36,13 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.Finalizer;
 import org.apache.metron.job.JobStatus;
 import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
 import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapOptions;
 import org.apache.metron.pcap.filter.PcapFilterConfigurator;
 import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
 import org.apache.metron.pcap.mr.PcapJob;
@@ -54,33 +55,62 @@ import org.mockito.MockitoAnnotations;
 public class PcapJobTest {
 
   @Mock
-  private Job job;
+  private Job mrJob;
   @Mock
   private org.apache.hadoop.mapreduce.JobStatus mrStatus;
   @Mock
   private JobID jobId;
-  private static final String JOB_ID_VAL = "job_abc_123";
+  @Mock
+  private Finalizer<Path> finalizer;
+  private Pageable<Path> pageableResult;
+  private FixedPcapConfig config;
+  private Configuration hadoopConfig;
+  private FileSystem fileSystem;
+  private String jobIdVal = "job_abc_123";
   private Path basePath;
   private Path baseOutPath;
   private long startTime;
   private long endTime;
   private int numReducers;
+  private int numRecordsPerFile;
+  private Path finalOutputPath;
   private Map<String, String> fixedFields;
-  private Configuration hadoopConfig;
+  private PcapJob<Map<String, String>> testJob;
+
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     MockitoAnnotations.initMocks(this);
     basePath = new Path("basepath");
     baseOutPath = new Path("outpath");
     startTime = 100;
     endTime = 200;
     numReducers = 5;
+    numRecordsPerFile = 5;
     fixedFields = new HashMap<>();
     fixedFields.put("ip_src_addr", "192.168.1.1");
     hadoopConfig = new Configuration();
-    when(jobId.toString()).thenReturn(JOB_ID_VAL);
+    fileSystem = FileSystem.get(hadoopConfig);
+    finalOutputPath = new Path("finaloutpath");
+    when(jobId.toString()).thenReturn(jobIdVal);
     when(mrStatus.getJobID()).thenReturn(jobId);
+    pageableResult = new PcapPages();
+    // handles setting the file name prefix under the hood
+    config = new FixedPcapConfig(clock -> "clockprefix");
+    PcapOptions.HADOOP_CONF.put(config, hadoopConfig);
+    PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig));
+    PcapOptions.BASE_PATH.put(config, basePath);
+    PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath);
+    PcapOptions.START_TIME_NS.put(config, startTime);
+    PcapOptions.END_TIME_NS.put(config, endTime);
+    PcapOptions.NUM_REDUCERS.put(config, numReducers);
+    PcapOptions.FIELDS.put(config, fixedFields);
+    PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator());
+    PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile);
+    PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath);
+    testJob = new TestJob<>();
+    testJob.setStatusInterval(10);
+    testJob.setCompleteCheckInterval(10);
   }
 
   @Test
@@ -98,147 +128,94 @@ public class PcapJobTest {
         equalTo(8));
   }
 
-  private class TestJob extends PcapJob {
+  private class TestJob<T> extends PcapJob<T> {
 
     @Override
-    public <T> Job createJob(Optional<String> jobName, Path basePath, Path outputPath, long beginNS, long endNS,
-        int numReducers, T fields, Configuration conf, FileSystem fs,
+    public Job createJob(Optional<String> jobName,
+        Path basePath,
+        Path outputPath,
+        long beginNS,
+        long endNS,
+        int numReducers,
+        T fields,
+        Configuration conf,
+        FileSystem fs,
         PcapFilterConfigurator<T> filterImpl) throws IOException {
-      return job;
+      return mrJob;
     }
   }
 
   @Test
   public void job_succeeds_synchronously() throws Exception {
-    when(job.isComplete()).thenReturn(true);
+    pageableResult = new PcapPages(
+        Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
+    when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+    when(mrJob.isComplete()).thenReturn(true);
     when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
-    when(job.getStatus()).thenReturn(mrStatus);
-    TestJob testJob = new TestJob();
-    Statusable statusable = testJob.query(
-        Optional.empty(),
-        basePath,
-        baseOutPath,
-        startTime,
-        endTime,
-        numReducers,
-        fixedFields,
-        hadoopConfig,
-        FileSystem.get(hadoopConfig),
-        new FixedPcapFilter.Configurator(),
-        true);
-    verify(job, times(1)).waitForCompletion(true);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    Pageable<Path> results = statusable.get();
+    Assert.assertThat(results.getSize(), equalTo(3));
     JobStatus status = statusable.getStatus();
     Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
     Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
-    Assert.assertThat(status.getResultPath(), notNullValue());
-    Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
+    Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
   }
 
   @Test
   public void job_fails_synchronously() throws Exception {
-    when(job.isComplete()).thenReturn(true);
+    when(mrJob.isComplete()).thenReturn(true);
     when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
-    when(job.getStatus()).thenReturn(mrStatus);
-    TestJob testJob = new TestJob();
-    Statusable statusable = testJob.query(
-        Optional.empty(),
-        basePath,
-        baseOutPath,
-        startTime,
-        endTime,
-        numReducers,
-        fixedFields,
-        hadoopConfig,
-        FileSystem.get(hadoopConfig),
-        new FixedPcapFilter.Configurator(),
-        true);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    Pageable<Path> results = statusable.get();
     JobStatus status = statusable.getStatus();
     Assert.assertThat(status.getState(), equalTo(State.FAILED));
     Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
-    Assert.assertThat(status.getResultPath(), notNullValue());
-    Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
+    Assert.assertThat(results.getSize(), equalTo(0));
   }
 
   @Test
   public void job_fails_with_killed_status_synchronously() throws Exception {
-    when(job.isComplete()).thenReturn(true);
+    when(mrJob.isComplete()).thenReturn(true);
     when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
-    when(job.getStatus()).thenReturn(mrStatus);
-    TestJob testJob = new TestJob();
-    Statusable statusable = testJob.query(
-        Optional.empty(),
-        basePath,
-        baseOutPath,
-        startTime,
-        endTime,
-        numReducers,
-        fixedFields,
-        hadoopConfig,
-        FileSystem.get(hadoopConfig),
-        new FixedPcapFilter.Configurator(),
-        true);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    Pageable<Path> results = statusable.get();
     JobStatus status = statusable.getStatus();
     Assert.assertThat(status.getState(), equalTo(State.KILLED));
     Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
-    Assert.assertThat(status.getResultPath(), notNullValue());
-    Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
+    Assert.assertThat(results.getSize(), equalTo(0));
   }
 
   @Test
   public void job_succeeds_asynchronously() throws Exception {
-    when(job.isComplete()).thenReturn(true);
+    // not complete a few times to make sure cancel works as expected
+    when(mrJob.isComplete()).thenReturn(false, false, false, true);
     when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
-    when(job.getStatus()).thenReturn(mrStatus);
-    TestJob testJob = new TestJob();
-    Statusable statusable = testJob.query(
-        Optional.empty(),
-        basePath,
-        baseOutPath,
-        startTime,
-        endTime,
-        numReducers,
-        fixedFields,
-        hadoopConfig,
-        FileSystem.get(hadoopConfig),
-        new FixedPcapFilter.Configurator(),
-        false);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    while (!statusable.isDone()) {
+    }
     JobStatus status = statusable.getStatus();
     Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
     Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
-    String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
-    Assert.assertThat(status.getResultPath(), notNullValue());
-    Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
   }
 
   @Test
   public void job_reports_percent_complete() throws Exception {
-    when(job.isComplete()).thenReturn(false);
+    when(mrJob.isComplete()).thenReturn(false);
     when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
-    when(job.getStatus()).thenReturn(mrStatus);
-    TestJob testJob = new TestJob();
-    Statusable statusable = testJob.query(
-        Optional.empty(),
-        basePath,
-        baseOutPath,
-        startTime,
-        endTime,
-        numReducers,
-        fixedFields,
-        hadoopConfig,
-        FileSystem.get(hadoopConfig),
-        new FixedPcapFilter.Configurator(),
-        false);
-    when(job.mapProgress()).thenReturn(0.5f);
-    when(job.reduceProgress()).thenReturn(0f);
+    when(mrJob.getStatus()).thenReturn(mrStatus);
+    Statusable<Path> statusable = testJob.submit(finalizer, config);
+    when(mrJob.mapProgress()).thenReturn(0.5f);
+    when(mrJob.reduceProgress()).thenReturn(0f);
     JobStatus status = statusable.getStatus();
     Assert.assertThat(status.getState(), equalTo(State.RUNNING));
     Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%"));
     Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
-    when(job.mapProgress()).thenReturn(1.0f);
-    when(job.reduceProgress()).thenReturn(0.5f);
+    when(mrJob.mapProgress()).thenReturn(1.0f);
+    when(mrJob.reduceProgress()).thenReturn(0.5f);
     status = statusable.getStatus();
     Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
     Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%"));