You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/08/17 15:34:18 UTC
[03/51] [abbrv] metron git commit: METRON-1614: Create job status
abstraction (mmiklavc via mmiklavc) closes apache/metron#1108
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/metron-job_state_statechart_diagram.svg
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.svg b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg
new file mode 100644
index 0000000..a99c5ad
--- /dev/null
+++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg
@@ -0,0 +1,14 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="661px" height="291px" version="1.1" style="background-color: rgb(255, 255, 255);"><defs/><g transform="translate(0.5,0.5)"><ellipse cx="15" cy="30" rx="11" ry="11" fill="#000000" stroke="#ff0000" transform="rotate(90,15,30)" pointer-events="none"/><rect x="110" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(123.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="92" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 92px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-dec
oration:inherit;">NOT_RUNNING</div></div></foreignObject><text x="46" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">NOT_RUNNING</text></switch></g><path d="M 440 30 L 537.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 529.88 34.5 L 538.88 30 L 529.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(350.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="58" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 60px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://ww
w.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">RUNNING</div></div></foreignObject><text x="29" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">RUNNING</text></switch></g><rect x="540" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(563.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="72" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 74px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FINALIZING</div></div></foreignObject><text x="36" y="12" fill="#0000
00" text-anchor="middle" font-size="12px" font-family="Verdana">FINALIZING</text></switch></g><path d="M 30 30 L 107.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 99.88 34.5 L 108.88 30 L 99.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 230 30 L 317.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 309.88 34.5 L 318.88 30 L 309.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0,
0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">KILLED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">KILLED</text></switch></g><rect x="320" y="230" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,253.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml"
style="display:inline-block;text-align:inherit;text-decoration:inherit;">FAILED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">FAILED</text></switch></g><rect x="540" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(562.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="75" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 76px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">SUCCEEDED</div></div></foreignObject><text x="38" y="12" fill="#000000" text-anchor="mid
dle" font-size="12px" font-family="Verdana">SUCCEEDED</text></switch></g><path d="M 380 60 L 380 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 375.5 109.88 L 380 118.88 L 384.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 600 60 L 600 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 595.5 109.88 L 600 118.88 L 604.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 250 Q 490 260 480 260 L 442.24 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 255.5 L 441.12 260 L 450.12 264.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 320 45 L 290 45 Q 280 45 280 55 L 280 250 Q 280 260 290 260 L 317.76 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path
d="M 309.88 264.5 L 318.88 260 L 309.88 255.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 140 Q 490 150 480 150 L 442.24 150" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 145.5 L 441.12 150 L 450.12 154.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/></g></svg>
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/metron-job_state_statechart_diagram.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.xml b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml
new file mode 100644
index 0000000..b9ee8aa
--- /dev/null
+++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml
@@ -0,0 +1,14 @@
+<!--
+ Licensed to the Apache Software
+ Foundation (ASF) under one or more contributor license agreements. See the
+ NOTICE file distributed with this work for additional information regarding
+ copyright ownership. The ASF licenses this file to You under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software distributed
+ under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+ OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+ the specific language governing permissions and limitations under the License.
+ -->
+<!-- This is a draw.io diagram. You can load it from http://www.draw.io -->
+<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36" version="8.9.5" editor="www.draw.io" type="google"><diagram id="58cdce13-f638-feb5-8d6f-7d28b1aa9fa0" name="Page-1">7VrbctowEP0aHtvxTdg8BkJSpintlKZN+tJRbGFrIixGiAL9+kq2fAfiOhjIFF6wdqWVtOfsauVxxxzM1rcMzoNP1EOkY2jeumNedwzD0GxN/EnJJpb0eiAW+Ax7sUjPBBP8BymhGucvsYcWhY6cUsLxvCh0aRgilxdkkDG6KnabUlKcdQ59VBFMXEiq0h/Y40EsdYCWyT8g7AfJzLqmNE/QffYZXYZqvo5hTqNfrJ7BxJbqvwigR1c5kTnsmANGKY+fZusBItK3idvicTc7tOm6GQp5rQGOY1sAOK6NtCdNs98pC78hWaJkC9FC+SZxjrAgcBCNfsBnRMh08Sg2Mpf6BYeMTzjkUj/FhAwooSwaaGrRT3bmjD6jnGY6VZrIdcjLjMbekSqXzrCrngl8QqSf+jqxFNIwmpaG/AbOMJHs+46YB0OoxIpquqHa21YHCfZDIXOFD5FQ9j3MBMkwlcIFXUoI+8pNiHG03ul7PUVURAqiM8TZRnRRAxzFARUjaXuVMc5UoiBHtkQGFcf91HCGs3hQUNeE3ajAPv787dfX+/F4NL6tMKCI0irAHE3m0JXalUgIRWZA5iqvG1aVEzI23P2cOBMWHABxvVeCvAsqkOvGFsy7bWBuViPbE1lQNSnjAfVpCMkwk+ZxRWvMH3LPjxKV90C2QrG0BwVS1Mh0+zFDoXcl07ecf47CWKLQcvaSpCbaCaoETXmKqdx2AVER5MxFO9xmqfMI
Mh/xHX1AbWYwRCDHv4vzHxRmqxLal7A+bFhb2jmFNajgfTMaX92Nfl4gPxzkXeOcIO++LpO//Zyc3BH25WTjjHKyfTl6m8Bs1IDZOjzMaugXioXFNAWYpWLOLBfm8VbUqPwd7AVDhlYyFO+3YihiXbqfZkR0KofFx9Hd3fD6clC0VBuYzikPil61NrgaXeBuD26rd0q4k9dpObwn94PBcHh9gby1UvC0Ea7rFWAblRaqnFDFhV4oLbJK47EWaG+htKhzq3eOVVoAs0QpCzQrLbovGWqxtNC3vDn+Jya+Gb7tjpkmTAQ1mJgk9vapaNslBoGmVLR729PkMaj42ledCRX1zpnfqlIqFm6GNmiRi71jUbFbuXA1pWLZUO+IVLQOlRX/MyrWOaCPR8XyuWo0vPuXT3pQ5vQOKgok4SbXbS47LHYv2OyWFqxre9dV7g8MUIqEeAWHiwtwSdGtpeijFa4nj4uXqSia2ecVcffsGxZz+Bc=</diagram></mxfile>
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java
new file mode 100644
index 0000000..5001cfa
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Finalizer.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+import java.util.Map;
+
+/**
+ * Finalize a job.
+ *
+ * @param <PAGE_T> Type for the Pageable.
+ */
+public interface Finalizer<PAGE_T> {
+
+ /**
+ * Run any routines for finalizing a job.
+ *
+ * @param config options to be used by the finalization process.
+ * @return Pageable results.
+ */
+ Pageable<PAGE_T> finalizeJob(Map<String, Object> config) throws JobException;
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java
new file mode 100644
index 0000000..10096cd
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public class JobException extends Exception {
+
+ public JobException(String message) {
+ super(message);
+ }
+
+ public JobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
index ec006fb..5a2f485 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java
@@ -18,8 +18,6 @@
package org.apache.metron.job;
-import org.apache.hadoop.fs.Path;
-
/**
* Capture metadata about a batch job.
*/
@@ -29,6 +27,7 @@ public class JobStatus {
NOT_RUNNING,
RUNNING,
SUCCEEDED,
+ FINALIZING,
FAILED,
KILLED
}
@@ -37,7 +36,7 @@ public class JobStatus {
private State state = State.NOT_RUNNING;
private double percentComplete = 0.0;
private String description;
- private Path resultPath;
+ private long completionTime;
public JobStatus withJobId(String jobId) {
this.jobId = jobId;
@@ -59,11 +58,15 @@ public class JobStatus {
return this;
}
- public JobStatus withResultPath(Path resultPath) {
- this.resultPath = resultPath;
+ public JobStatus withCompletionTime(long completionTime) {
+ this.completionTime = completionTime;
return this;
}
+ public String getJobId() {
+ return jobId;
+ }
+
public State getState() {
return state;
}
@@ -76,8 +79,8 @@ public class JobStatus {
return description;
}
- public Path getResultPath() {
- return resultPath;
+ public long getCompletionTime() {
+ return completionTime;
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
index 1038ab8..d93c7de 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Pageable.java
@@ -18,14 +18,7 @@
package org.apache.metron.job;
-public interface Pageable<T> {
-
- /**
- * Transform into an Iterable.
- *
- * @return Iterable version of this Pageable.
- */
- Iterable<T> asIterable();
+public interface Pageable<T> extends Iterable<T> {
/**
* Provides access to a specific page of results in the result set.
@@ -35,4 +28,11 @@ public interface Pageable<T> {
*/
T getPage(int num);
+ /**
+ * Number of pages i this Pageable.
+ *
+ * @return number of pages
+ */
+ int getSize();
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
index 7a8fc02..9bdea35 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/Statusable.java
@@ -18,20 +18,44 @@
package org.apache.metron.job;
-import java.io.IOException;
import java.util.Map;
/**
* Abstraction for getting status on running jobs. Also provides options for killing and validating.
*/
-public interface Statusable {
+public interface Statusable<PAGE_T> {
+
+ enum JobType {
+ MAP_REDUCE;
+ }
+
+ /**
+ * Submit the job asynchronously.
+ *
+ * @return self
+ */
+ Statusable<PAGE_T> submit(Finalizer<PAGE_T> finalizer, Map<String, Object> configuration) throws JobException;
+
+ /**
+ * Synchronous call.
+ *
+ * @return pages of results
+ */
+ Pageable<PAGE_T> get() throws JobException, InterruptedException;
+
+ /**
+ * Execution framework type of this job.
+ *
+ * @return type of job
+ */
+ JobType getJobType();
/**
* Current job status.
*
* @return status
*/
- JobStatus getStatus();
+ JobStatus getStatus() throws JobException;
/**
* Completion flag.
@@ -43,7 +67,7 @@ public interface Statusable {
/**
* Kill job.
*/
- void kill() throws IOException;
+ void kill() throws JobException;
/**
* Validate job after submitted.
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
new file mode 100644
index 0000000..bf0baa7
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job.manager;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private Map<String, Map<String, Statusable<PAGE_T>>> jobs;
+
+ public InMemoryJobManager() {
+ this.jobs = Collections.synchronizedMap(new HashMap<>());
+ }
+
+ @Override
+ public JobStatus submit(Supplier<Statusable<PAGE_T>> jobSupplier, String username)
+ throws JobException {
+ Map<String, Statusable<PAGE_T>> userJobs = getUserJobs(username);
+ Statusable<PAGE_T> job = jobSupplier.get();
+ userJobs.put(job.getStatus().getJobId(), job);
+ jobs.put(username, userJobs);
+ return job.getStatus();
+ }
+
+ @Override
+ public JobStatus getStatus(String username, String jobId) throws JobException {
+ return jobs.get(username).get(jobId).getStatus();
+ }
+
+ @Override
+ public boolean done(String username, String jobId) throws JobException {
+ return getJob(username, jobId).isDone();
+ }
+
+ @Override
+ public void killJob(String username, String jobId) throws JobException {
+ getJob(username, jobId).kill();
+ }
+
+ @Override
+ public Statusable<PAGE_T> getJob(String username, String jobId) throws JobException {
+ return getUserJobs(username).get(jobId);
+ }
+
+ private Map<String, Statusable<PAGE_T>> getUserJobs(String username) {
+ return jobs.getOrDefault(username, Collections.synchronizedMap(new HashMap<>()));
+ }
+
+ @Override
+ public List<Statusable<PAGE_T>> getJobs(String username) throws JobException {
+ return new ArrayList<Statusable<PAGE_T>>(getUserJobs(username).values());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java
new file mode 100644
index 0000000..eff60e5
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/JobManager.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job.manager;
+
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
+
+public interface JobManager<PAGE_T> {
+
+ JobStatus submit(Supplier<Statusable<PAGE_T>> jobSupplier, String username) throws JobException;
+
+ JobStatus getStatus(String username, String jobId) throws JobException;
+
+ boolean done(String username, String jobId) throws JobException;
+
+ void killJob(String username, String jobId) throws JobException;
+
+ Statusable<PAGE_T> getJob(String username, String jobId) throws JobException;
+
+ List<Statusable<PAGE_T>> getJobs(String username) throws JobException;
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java
new file mode 100644
index 0000000..f3a3978
--- /dev/null
+++ b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job.manager;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.job.Statusable.JobType;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class InMemoryJobManagerTest {
+
+ @Rule
+ public TemporaryFolder tempDir = new TemporaryFolder();
+ @Mock
+ private Statusable<Path> job1;
+ @Mock
+ private Statusable<Path> job2;
+ @Mock
+ private Statusable<Path> job3;
+ @Mock
+ private Finalizer<Path> finalizer;
+ @Mock
+ private Pageable<Path> results;
+ private JobManager<Path> jm;
+ private Map<String, Object> config;
+ private String username1;
+ private String username2;
+ private String jobId1;
+ private String jobId2;
+ private String jobId3;
+ private String basePath;
+
+ @Before
+ public void setup() throws JobException {
+ MockitoAnnotations.initMocks(this);
+ jm = new InMemoryJobManager<Path>();
+ config = new HashMap<>();
+ username1 = "user123";
+ username2 = "user456";
+ jobId1 = "job_abc_123";
+ jobId2 = "job_def_456";
+ jobId3 = "job_ghi_789";
+ basePath = tempDir.getRoot().getAbsolutePath();
+ when(job1.getJobType()).thenReturn(JobType.MAP_REDUCE);
+ when(job2.getJobType()).thenReturn(JobType.MAP_REDUCE);
+ when(job3.getJobType()).thenReturn(JobType.MAP_REDUCE);
+ when(job1.submit(finalizer, config)).thenReturn(job1);
+ when(job2.submit(finalizer, config)).thenReturn(job2);
+ when(job3.submit(finalizer, config)).thenReturn(job3);
+ when(finalizer.finalizeJob(any())).thenReturn(results);
+ }
+
+ @Test
+ public void submits_job_and_returns_status() throws JobException {
+ when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1));
+ JobStatus status = jm.submit(newSupplier(job1), username1);
+ assertThat(status.getState(), equalTo(State.RUNNING));
+ assertThat(status.getJobId(), equalTo(jobId1));
+ when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1));
+ status = jm.getStatus(username1, status.getJobId());
+ assertThat(status.getState(), equalTo(State.SUCCEEDED));
+ assertThat(status.getJobId(), equalTo(jobId1));
+ }
+
+ @Test
+ public void submits_multiple_jobs_and_returns_status() throws JobException {
+ when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1));
+ when(job2.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId2));
+ when(job3.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId3));
+
+ // user has 1 job
+ jm.submit(newSupplier(job1), username1);
+ assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+
+ // user has 2 jobs
+ jm.submit(newSupplier(job2), username1);
+ assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+ assertThat(jm.getJob(username1, jobId2), equalTo(job2));
+
+ // user has 3 jobs
+ jm.submit(newSupplier(job3), username1);
+ assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+ assertThat(jm.getJob(username1, jobId2), equalTo(job2));
+ assertThat(jm.getJob(username1, jobId3), equalTo(job3));
+
+ // multiple users have 3 jobs
+ jm.submit(newSupplier(job1), username2);
+ jm.submit(newSupplier(job2), username2);
+ jm.submit(newSupplier(job3), username2);
+ // user 1 still good
+ assertThat(jm.getJob(username1, jobId1), equalTo(job1));
+ assertThat(jm.getJob(username1, jobId2), equalTo(job2));
+ assertThat(jm.getJob(username1, jobId3), equalTo(job3));
+ // and also user 2
+ assertThat(jm.getJob(username2, jobId1), equalTo(job1));
+ assertThat(jm.getJob(username2, jobId2), equalTo(job2));
+ assertThat(jm.getJob(username2, jobId3), equalTo(job3));
+ }
+
+ @Test
+ public void returns_job_status() throws JobException {
+ JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1);
+ when(job1.getStatus()).thenReturn(expected);
+ jm.submit(newSupplier(job1), username1);
+ JobStatus status = jm.getStatus(username1, jobId1);
+ assertThat(status, equalTo(expected));
+ }
+
+ @Test
+ public void returns_job_is_done() throws JobException {
+ JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1);
+ when(job1.getStatus()).thenReturn(expected);
+ when(job1.isDone()).thenReturn(true);
+ jm.submit(newSupplier(job1), username1);
+ boolean done = jm.done(username1, jobId1);
+ assertThat(done, equalTo(true));
+ }
+
+ @Test
+ public void kills_job() throws JobException {
+ when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1));
+ jm.submit(newSupplier(job1), username1);
+ jm.killJob(username1, jobId1);
+ verify(job1).kill();
+ }
+
+ @Test
+ public void gets_list_of_user_jobs() throws JobException {
+ when(job1.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId1));
+ when(job2.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId2));
+ when(job3.getStatus()).thenReturn(new JobStatus().withState(State.RUNNING).withJobId(jobId3));
+ jm.submit(newSupplier(job1), username1);
+ jm.submit(newSupplier(job2), username1);
+ jm.submit(newSupplier(job3), username1);
+ jm.submit(newSupplier(job1), username2);
+ jm.submit(newSupplier(job2), username2);
+ jm.submit(newSupplier(job3), username2);
+ List<Statusable<Path>> jobsUser1 = jm.getJobs(username1);
+ List<Statusable<Path>> jobsUser2 = jm.getJobs(username2);
+ assertThat("Wrong size", jobsUser1.size(), equalTo(3));
+ assertThat("Wrong size", jobsUser2.size(), equalTo(3));
+ assertThat("", jobsUser1.containsAll(Arrays.asList(job1, job2, job3)), equalTo(true));
+ assertThat("", jobsUser2.containsAll(Arrays.asList(job1, job2, job3)), equalTo(true));
+ }
+
+ private Supplier<Statusable<Path>> newSupplier(Statusable<Path> job) {
+ return () -> {
+ try {
+ return job.submit(finalizer, config);
+ } catch (JobException e) {
+ throw new RuntimeException("Something went wrong", e);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index c3b1a69..fb523ee 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -218,6 +218,12 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
deleted file mode 100644
index 1d8e3f3..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliConfig.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.metron.common.system.Clock;
-
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.UUID;
-import java.util.function.Consumer;
-import java.util.function.Function;
-
-public class CliConfig {
- public interface PrefixStrategy extends Function<Clock, String>{}
-
- private boolean showHelp;
- private String prefix;
- private String basePath;
- private String baseOutputPath;
- private long startTime;
- private long endTime;
- private int numReducers;
- private int numRecordsPerFile;
- private DateFormat dateFormat;
-
-
- public CliConfig(PrefixStrategy prefixStrategy) {
- showHelp = false;
- basePath = "";
- baseOutputPath = "";
- startTime = -1L;
- endTime = -1L;
- numReducers = 0;
- prefix = prefixStrategy.apply(new Clock());
- }
-
- public String getPrefix() {
- return prefix;
- }
-
- public void setPrefix(String prefix) {
- this.prefix = prefix;
- }
-
- public int getNumReducers() {
- return numReducers;
- }
-
- public boolean showHelp() {
- return showHelp;
- }
-
- public void setShowHelp(boolean showHelp) {
- this.showHelp = showHelp;
- }
-
- public String getBasePath() {
- return basePath;
- }
-
- public String getBaseOutputPath() {
- return baseOutputPath;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public void setBasePath(String basePath) {
- this.basePath = basePath;
- }
-
- public void setBaseOutputPath(String baseOutputPath) {
- this.baseOutputPath = baseOutputPath;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- public boolean isNullOrEmpty(String val) {
- return StringUtils.isEmpty(val);
- }
-
- public void setDateFormat(String dateFormat) {
- this.dateFormat = new SimpleDateFormat(dateFormat);
- }
-
- public DateFormat getDateFormat() {
- return dateFormat;
- }
-
- public void setNumReducers(int numReducers) {
- this.numReducers = numReducers;
- }
-
- public int getNumRecordsPerFile() {
- return numRecordsPerFile;
- }
-
- public void setNumRecordsPerFile(int numRecordsPerFile) {
- this.numRecordsPerFile = numRecordsPerFile;
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
index d5976ae..e6534c5 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/CliParser.java
@@ -19,19 +19,20 @@
package org.apache.metron.pcap.query;
import org.apache.commons.cli.*;
+import org.apache.metron.pcap.config.PcapConfig;
/**
* Provides commmon required fields for the PCAP filter jobs
*/
public class CliParser {
public static final String BASE_PATH_DEFAULT = "/apps/metron/pcap";
- public static final String BASE_OUTPUT_PATH_DEFAULT = "/tmp";
+ public static final String BASE_INTERIM_OUTPUT_PATH_DEFAULT = "/tmp";
public static final int NUM_REDUCERS_DEFAULT = 10;
public static final int NUM_RECORDS_PER_FILE_DEFAULT = 10000;
private CommandLineParser parser;
- protected CliConfig.PrefixStrategy prefixStrategy;
+ protected PcapConfig.PrefixStrategy prefixStrategy;
- public CliParser(CliConfig.PrefixStrategy prefixStrategy) {
+ public CliParser(PcapConfig.PrefixStrategy prefixStrategy) {
this.prefixStrategy = prefixStrategy;
parser = new PosixParser();
}
@@ -40,7 +41,8 @@ public class CliParser {
Options options = new Options();
options.addOption(newOption("h", "help", false, "Display help"));
options.addOption(newOption("bp", "base_path", true, String.format("Base PCAP data path. Default is '%s'", BASE_PATH_DEFAULT)));
- options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'", BASE_OUTPUT_PATH_DEFAULT)));
+ options.addOption(newOption("bop", "base_output_path", true, String.format("Query result output path. Default is '%s'",
+ BASE_INTERIM_OUTPUT_PATH_DEFAULT)));
options.addOption(newOption("st", "start_time", true, "(required) Packet start time range.", true));
options.addOption(newOption("nr", "num_reducers", true, String.format("Number of reducers to use (defaults to %s)", NUM_REDUCERS_DEFAULT)));
options.addOption(newOption("rpf", "records_per_file", true, String.format("Number of records to include in each output pcap file (defaults to %s)", NUM_RECORDS_PER_FILE_DEFAULT)));
@@ -59,7 +61,7 @@ public class CliParser {
return option;
}
- public void parse(CommandLine commandLine, CliConfig config) throws java.text.ParseException {
+ public void parse(CommandLine commandLine, PcapConfig config) throws java.text.ParseException {
if (commandLine.hasOption("help")) {
config.setShowHelp(true);
}
@@ -72,18 +74,18 @@ public class CliParser {
config.setBasePath(BASE_PATH_DEFAULT);
}
if (commandLine.hasOption("base_output_path")) {
- config.setBaseOutputPath(commandLine.getOptionValue("base_output_path"));
+ config.setBaseInterimResultPath(commandLine.getOptionValue("base_output_path"));
} else {
- config.setBaseOutputPath(BASE_OUTPUT_PATH_DEFAULT);
+ config.setBaseInterimResultPath(BASE_INTERIM_OUTPUT_PATH_DEFAULT);
}
if (commandLine.hasOption("start_time")) {
try {
if (commandLine.hasOption("date_format")) {
long startTime = config.getDateFormat().parse(commandLine.getOptionValue("start_time")).getTime();
- config.setStartTime(startTime);
+ config.setStartTimeMs(startTime);
} else {
long startTime = Long.parseLong(commandLine.getOptionValue("start_time"));
- config.setStartTime(startTime);
+ config.setStartTimeMs(startTime);
}
} catch (NumberFormatException nfe) {
//no-op
@@ -107,10 +109,10 @@ public class CliParser {
try {
if (commandLine.hasOption("date_format")) {
long endTime = config.getDateFormat().parse(commandLine.getOptionValue("end_time")).getTime();
- config.setEndTime(endTime);
+ config.setEndTimeMs(endTime);
} else {
long endTime = Long.parseLong(commandLine.getOptionValue("end_time"));
- config.setEndTime(endTime);
+ config.setEndTimeMs(endTime);
}
} catch (NumberFormatException nfe) {
//no-op
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
deleted file mode 100644
index 03caed7..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliConfig.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-import org.apache.metron.common.Constants;
-
-import java.util.EnumMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class FixedCliConfig extends CliConfig {
-
- private Map<String, String> fixedFields;
-
- public FixedCliConfig(PrefixStrategy prefixStrategy) {
- super(prefixStrategy);
- this.fixedFields = new LinkedHashMap<>();
- }
-
- public Map<String, String> getFixedFields() {
- return fixedFields;
- }
-
- public void setFixedFields(Map<String, String> fixedFields) {
- this.fixedFields = fixedFields;
- }
-
- public void putFixedField(String key, String value) {
- String trimmedVal = value != null ? value.trim() : null;
- if (!isNullOrEmpty(trimmedVal)) {
- this.fixedFields.put(key, value);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
index 4e1bfcf..19d351c 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/FixedCliParser.java
@@ -22,11 +22,13 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.metron.common.Constants;
import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapConfig;
public class FixedCliParser extends CliParser {
private Options fixedOptions;
- public FixedCliParser(CliConfig.PrefixStrategy prefixStrategy) {
+ public FixedCliParser(PcapConfig.PrefixStrategy prefixStrategy) {
super(prefixStrategy);
fixedOptions = buildFixedOptions();
}
@@ -51,9 +53,9 @@ public class FixedCliParser extends CliParser {
* @return Configuration tailored to fixed pcap queries
* @throws ParseException
*/
- public FixedCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+ public FixedPcapConfig parse(String[] args) throws ParseException, java.text.ParseException {
CommandLine commandLine = getParser().parse(fixedOptions, args);
- FixedCliConfig config = new FixedCliConfig(prefixStrategy);
+ FixedPcapConfig config = new FixedPcapConfig(prefixStrategy);
super.parse(commandLine, config);
config.putFixedField(Constants.Fields.SRC_ADDR.getName(), commandLine.getOptionValue("ip_src_addr"));
config.putFixedField(Constants.Fields.DST_ADDR.getName(), commandLine.getOptionValue("ip_dst_addr"));
@@ -63,7 +65,7 @@ public class FixedCliParser extends CliParser {
config.putFixedField(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), Boolean.toString(commandLine.hasOption("include_reverse")));
config.putFixedField(PcapHelper.PacketFields.PACKET_FILTER.getName(), commandLine.getOptionValue("packet_filter"));
if(commandLine.hasOption("prefix")) {
- config.setPrefix(commandLine.getOptionValue("prefix"));
+ config.setFinalFilenamePrefix(commandLine.getOptionValue("prefix"));
}
return config;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index 0fda801..3462921 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -27,45 +27,46 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
-import org.apache.metron.common.system.Clock;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapConfig;
+import org.apache.metron.pcap.config.QueryPcapConfig;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.filter.query.QueryPcapFilter;
+import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies;
import org.apache.metron.pcap.mr.PcapJob;
-import org.apache.metron.pcap.writer.ResultsWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PcapCli {
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final CliConfig.PrefixStrategy PREFIX_STRATEGY = clock -> {
- String timestamp = new Clock().currentTimeFormatted("yyyyMMddHHmm");
+ public static final PcapConfig.PrefixStrategy PREFIX_STRATEGY = clock -> {
+ String timestamp = clock.currentTimeFormatted("yyyyMMddHHmm");
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
return String.format("%s-%s", timestamp, uuid);
};
private final PcapJob jobRunner;
- private final ResultsWriter resultsWriter;
- private final CliConfig.PrefixStrategy prefixStrategy;
+ private final PcapConfig.PrefixStrategy prefixStrategy;
public static void main(String[] args) {
- int status = new PcapCli(new PcapJob(), new ResultsWriter(), PREFIX_STRATEGY).run(args);
+ int status = new PcapCli(new PcapJob(), PREFIX_STRATEGY).run(args);
System.exit(status);
}
- public PcapCli(PcapJob jobRunner, ResultsWriter resultsWriter, CliConfig.PrefixStrategy prefixStrategy) {
+ public PcapCli(PcapJob jobRunner, PcapConfig.PrefixStrategy prefixStrategy) {
this.jobRunner = jobRunner;
- this.resultsWriter = resultsWriter;
this.prefixStrategy = prefixStrategy;
}
+
public int run(String[] args) {
if (args.length < 1) {
printBasicHelp();
return -1;
}
String jobType = args[0];
- SequenceFileIterable results = null;
String[] commandArgs = Arrays.copyOfRange(args, 1, args.length);
Configuration hadoopConf = new Configuration();
String[] otherArgs = null;
@@ -75,13 +76,18 @@ public class PcapCli {
LOGGER.error("Failed to configure hadoop with provided options: {}", e.getMessage(), e);
return -1;
}
- CliConfig commonConfig = null;
+ PcapConfig commonConfig = null;
+ Pageable<Path> results;
+ // write to local FS in the executing directory
+ String execDir = System.getProperty("user.dir");
+
if ("fixed".equals(jobType)) {
FixedCliParser fixedParser = new FixedCliParser(prefixStrategy);
- FixedCliConfig config = null;
+ FixedPcapConfig config = null;
try {
config = fixedParser.parse(otherArgs);
commonConfig = config;
+ PcapOptions.FINAL_OUTPUT_PATH.put(commonConfig, new Path("file:///" + execDir));
} catch (ParseException | java.text.ParseException e) {
System.err.println(e.getMessage());
System.err.flush();
@@ -92,28 +98,24 @@ public class PcapCli {
fixedParser.printHelp();
return 0;
}
- Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+ Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
long startTime = time.getLeft();
long endTime = time.getRight();
+ PcapOptions.START_TIME_NS.put(commonConfig, startTime);
+ PcapOptions.END_TIME_NS.put(commonConfig, endTime);
+ PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
+ PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
- results = jobRunner.query(
- new Path(config.getBasePath()),
- new Path(config.getBaseOutputPath()),
- startTime,
- endTime,
- config.getNumReducers(),
- config.getFixedFields(),
- hadoopConf,
- FileSystem.get(hadoopConf),
- new FixedPcapFilter.Configurator());
- } catch (IOException | ClassNotFoundException | InterruptedException e) {
+ PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
+ results = jobRunner.submit(PcapFinalizerStrategies.CLI, commonConfig).get();
+ } catch (IOException|InterruptedException | JobException e) {
LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e);
return -1;
}
} else if ("query".equals(jobType)) {
QueryCliParser queryParser = new QueryCliParser(prefixStrategy);
- QueryCliConfig config = null;
+ QueryPcapConfig config = null;
try {
config = queryParser.parse(otherArgs);
commonConfig = config;
@@ -126,23 +128,19 @@ public class PcapCli {
queryParser.printHelp();
return 0;
}
- Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTime(), config.getEndTime());
+ Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
long startTime = time.getLeft();
long endTime = time.getRight();
+ PcapOptions.START_TIME_NS.put(commonConfig, startTime);
+ PcapOptions.END_TIME_NS.put(commonConfig, endTime);
+ PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
+ PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
- results = jobRunner.query(
- new Path(config.getBasePath()),
- new Path(config.getBaseOutputPath()),
- startTime,
- endTime,
- config.getNumReducers(),
- config.getQuery(),
- hadoopConf,
- FileSystem.get(hadoopConf),
- new QueryPcapFilter.Configurator());
- } catch (IOException | ClassNotFoundException | InterruptedException e) {
- LOGGER.error("Failed to execute query filter job: {}", e.getMessage(), e);
+ PcapOptions.FILESYSTEM.put(commonConfig, FileSystem.get(hadoopConf));
+ results = jobRunner.submit(PcapFinalizerStrategies.CLI, commonConfig).get();
+ } catch (IOException| InterruptedException | JobException e) {
+ LOGGER.error("Failed to execute fixed filter job: {}", e.getMessage(), e);
return -1;
}
} else {
@@ -150,17 +148,6 @@ public class PcapCli {
return -1;
}
- try {
- // write to local FS in the executing directory
- String execDir = System.getProperty("user.dir");
- jobRunner.writeResults(results, resultsWriter, new Path("file:///" + execDir),
- commonConfig.getNumRecordsPerFile(),
- commonConfig.getPrefix());
- } catch (IOException e) {
- LOGGER.error("Unable to write file", e);
- return -1;
- }
-
return 0;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
deleted file mode 100644
index 67f045f..0000000
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliConfig.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.query;
-
-public class QueryCliConfig extends CliConfig {
- private String query;
-
- public QueryCliConfig(PrefixStrategy prefixStrategy) {
- super(prefixStrategy);
- }
-
- public String getQuery() {
- return query;
- }
-
- public void setQuery(String query) {
- this.query = query;
- }
-}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
index d6e5cd1..b4375d1 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/QueryCliParser.java
@@ -20,11 +20,13 @@ package org.apache.metron.pcap.query;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.metron.pcap.config.PcapConfig;
+import org.apache.metron.pcap.config.QueryPcapConfig;
public class QueryCliParser extends CliParser {
private Options queryOptions;
- public QueryCliParser(CliConfig.PrefixStrategy prefixStrategy) {
+ public QueryCliParser(PcapConfig.PrefixStrategy prefixStrategy) {
super(prefixStrategy);
queryOptions = setupOptions();
}
@@ -43,15 +45,15 @@ public class QueryCliParser extends CliParser {
* @return Configuration tailored to query pcap queries
* @throws ParseException
*/
- public QueryCliConfig parse(String[] args) throws ParseException, java.text.ParseException {
+ public QueryPcapConfig parse(String[] args) throws ParseException, java.text.ParseException {
CommandLine commandLine = getParser().parse(queryOptions, args);
- QueryCliConfig config = new QueryCliConfig(prefixStrategy);
+ QueryPcapConfig config = new QueryPcapConfig(prefixStrategy);
super.parse(commandLine, config);
if (commandLine.hasOption("query")) {
config.setQuery(commandLine.getOptionValue("query"));
}
if(commandLine.hasOption("prefix")) {
- config.setPrefix(commandLine.getOptionValue("prefix"));
+ config.setFinalFilenamePrefix(commandLine.getOptionValue("prefix"));
}
return config;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/dbbf6243/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
index 5a5d406..1e389d9 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java
@@ -19,15 +19,12 @@
package org.apache.metron.pcap;
import static java.lang.Long.toUnsignedString;
-import static java.lang.String.format;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.startsWith;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -39,9 +36,13 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.metron.common.utils.timestamp.TimestampConverters;
+import org.apache.metron.job.Finalizer;
import org.apache.metron.job.JobStatus;
import org.apache.metron.job.JobStatus.State;
+import org.apache.metron.job.Pageable;
import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.config.FixedPcapConfig;
+import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
import org.apache.metron.pcap.mr.PcapJob;
@@ -54,33 +55,62 @@ import org.mockito.MockitoAnnotations;
public class PcapJobTest {
@Mock
- private Job job;
+ private Job mrJob;
@Mock
private org.apache.hadoop.mapreduce.JobStatus mrStatus;
@Mock
private JobID jobId;
- private static final String JOB_ID_VAL = "job_abc_123";
+ @Mock
+ private Finalizer<Path> finalizer;
+ private Pageable<Path> pageableResult;
+ private FixedPcapConfig config;
+ private Configuration hadoopConfig;
+ private FileSystem fileSystem;
+ private String jobIdVal = "job_abc_123";
private Path basePath;
private Path baseOutPath;
private long startTime;
private long endTime;
private int numReducers;
+ private int numRecordsPerFile;
+ private Path finalOutputPath;
private Map<String, String> fixedFields;
- private Configuration hadoopConfig;
+ private PcapJob<Map<String, String>> testJob;
+
@Before
- public void setup() {
+ public void setup() throws IOException {
MockitoAnnotations.initMocks(this);
basePath = new Path("basepath");
baseOutPath = new Path("outpath");
startTime = 100;
endTime = 200;
numReducers = 5;
+ numRecordsPerFile = 5;
fixedFields = new HashMap<>();
fixedFields.put("ip_src_addr", "192.168.1.1");
hadoopConfig = new Configuration();
- when(jobId.toString()).thenReturn(JOB_ID_VAL);
+ fileSystem = FileSystem.get(hadoopConfig);
+ finalOutputPath = new Path("finaloutpath");
+ when(jobId.toString()).thenReturn(jobIdVal);
when(mrStatus.getJobID()).thenReturn(jobId);
+ pageableResult = new PcapPages();
+ // handles setting the file name prefix under the hood
+ config = new FixedPcapConfig(clock -> "clockprefix");
+ PcapOptions.HADOOP_CONF.put(config, hadoopConfig);
+ PcapOptions.FILESYSTEM.put(config, FileSystem.get(hadoopConfig));
+ PcapOptions.BASE_PATH.put(config, basePath);
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(config, baseOutPath);
+ PcapOptions.START_TIME_NS.put(config, startTime);
+ PcapOptions.END_TIME_NS.put(config, endTime);
+ PcapOptions.NUM_REDUCERS.put(config, numReducers);
+ PcapOptions.FIELDS.put(config, fixedFields);
+ PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator());
+ PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile);
+ PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath);
+ testJob = new TestJob<>();
+ testJob.setStatusInterval(10);
+ testJob.setCompleteCheckInterval(10);
}
@Test
@@ -98,147 +128,94 @@ public class PcapJobTest {
equalTo(8));
}
- private class TestJob extends PcapJob {
+ private class TestJob<T> extends PcapJob<T> {
@Override
- public <T> Job createJob(Optional<String> jobName, Path basePath, Path outputPath, long beginNS, long endNS,
- int numReducers, T fields, Configuration conf, FileSystem fs,
+ public Job createJob(Optional<String> jobName,
+ Path basePath,
+ Path outputPath,
+ long beginNS,
+ long endNS,
+ int numReducers,
+ T fields,
+ Configuration conf,
+ FileSystem fs,
PcapFilterConfigurator<T> filterImpl) throws IOException {
- return job;
+ return mrJob;
}
}
@Test
public void job_succeeds_synchronously() throws Exception {
- when(job.isComplete()).thenReturn(true);
+ pageableResult = new PcapPages(
+ Arrays.asList(new Path("1.txt"), new Path("2.txt"), new Path("3.txt")));
+ when(finalizer.finalizeJob(any())).thenReturn(pageableResult);
+ when(mrJob.isComplete()).thenReturn(true);
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
- when(job.getStatus()).thenReturn(mrStatus);
- TestJob testJob = new TestJob();
- Statusable statusable = testJob.query(
- Optional.empty(),
- basePath,
- baseOutPath,
- startTime,
- endTime,
- numReducers,
- fixedFields,
- hadoopConfig,
- FileSystem.get(hadoopConfig),
- new FixedPcapFilter.Configurator(),
- true);
- verify(job, times(1)).waitForCompletion(true);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ Pageable<Path> results = statusable.get();
+ Assert.assertThat(results.getSize(), equalTo(3));
JobStatus status = statusable.getStatus();
Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
- Assert.assertThat(status.getResultPath(), notNullValue());
- Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
+ Assert.assertThat(status.getJobId(), equalTo(jobIdVal));
}
@Test
public void job_fails_synchronously() throws Exception {
- when(job.isComplete()).thenReturn(true);
+ when(mrJob.isComplete()).thenReturn(true);
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
- when(job.getStatus()).thenReturn(mrStatus);
- TestJob testJob = new TestJob();
- Statusable statusable = testJob.query(
- Optional.empty(),
- basePath,
- baseOutPath,
- startTime,
- endTime,
- numReducers,
- fixedFields,
- hadoopConfig,
- FileSystem.get(hadoopConfig),
- new FixedPcapFilter.Configurator(),
- true);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ Pageable<Path> results = statusable.get();
JobStatus status = statusable.getStatus();
Assert.assertThat(status.getState(), equalTo(State.FAILED));
Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
- Assert.assertThat(status.getResultPath(), notNullValue());
- Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
+ Assert.assertThat(results.getSize(), equalTo(0));
}
@Test
public void job_fails_with_killed_status_synchronously() throws Exception {
- when(job.isComplete()).thenReturn(true);
+ when(mrJob.isComplete()).thenReturn(true);
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED);
- when(job.getStatus()).thenReturn(mrStatus);
- TestJob testJob = new TestJob();
- Statusable statusable = testJob.query(
- Optional.empty(),
- basePath,
- baseOutPath,
- startTime,
- endTime,
- numReducers,
- fixedFields,
- hadoopConfig,
- FileSystem.get(hadoopConfig),
- new FixedPcapFilter.Configurator(),
- true);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ Pageable<Path> results = statusable.get();
JobStatus status = statusable.getStatus();
Assert.assertThat(status.getState(), equalTo(State.KILLED));
Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
- Assert.assertThat(status.getResultPath(), notNullValue());
- Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
+ Assert.assertThat(results.getSize(), equalTo(0));
}
@Test
public void job_succeeds_asynchronously() throws Exception {
- when(job.isComplete()).thenReturn(true);
+ // not complete a few times to make sure cancel works as expected
+ when(mrJob.isComplete()).thenReturn(false, false, false, true);
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED);
- when(job.getStatus()).thenReturn(mrStatus);
- TestJob testJob = new TestJob();
- Statusable statusable = testJob.query(
- Optional.empty(),
- basePath,
- baseOutPath,
- startTime,
- endTime,
- numReducers,
- fixedFields,
- hadoopConfig,
- FileSystem.get(hadoopConfig),
- new FixedPcapFilter.Configurator(),
- false);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ while (!statusable.isDone()) {
+ }
JobStatus status = statusable.getStatus();
Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED));
Assert.assertThat(status.getPercentComplete(), equalTo(100.0));
- String expectedOutPath = new Path(baseOutPath, format("%s_%s_%s", startTime, endTime, "192.168.1.1")).toString();
- Assert.assertThat(status.getResultPath(), notNullValue());
- Assert.assertThat(status.getResultPath().toString(), startsWith(expectedOutPath));
}
@Test
public void job_reports_percent_complete() throws Exception {
- when(job.isComplete()).thenReturn(false);
+ when(mrJob.isComplete()).thenReturn(false);
when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING);
- when(job.getStatus()).thenReturn(mrStatus);
- TestJob testJob = new TestJob();
- Statusable statusable = testJob.query(
- Optional.empty(),
- basePath,
- baseOutPath,
- startTime,
- endTime,
- numReducers,
- fixedFields,
- hadoopConfig,
- FileSystem.get(hadoopConfig),
- new FixedPcapFilter.Configurator(),
- false);
- when(job.mapProgress()).thenReturn(0.5f);
- when(job.reduceProgress()).thenReturn(0f);
+ when(mrJob.getStatus()).thenReturn(mrStatus);
+ Statusable<Path> statusable = testJob.submit(finalizer, config);
+ when(mrJob.mapProgress()).thenReturn(0.5f);
+ when(mrJob.reduceProgress()).thenReturn(0f);
JobStatus status = statusable.getStatus();
Assert.assertThat(status.getState(), equalTo(State.RUNNING));
Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%"));
Assert.assertThat(status.getPercentComplete(), equalTo(25.0));
- when(job.mapProgress()).thenReturn(1.0f);
- when(job.reduceProgress()).thenReturn(0.5f);
+ when(mrJob.mapProgress()).thenReturn(1.0f);
+ when(mrJob.reduceProgress()).thenReturn(0.5f);
status = statusable.getStatus();
Assert.assertThat(status.getPercentComplete(), equalTo(75.0));
Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%"));