You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by rm...@apache.org on 2018/08/17 15:34:22 UTC
[07/51] [abbrv] metron git commit: METRON-1674 Create REST endpoint
for job status abstraction (merrimanr) closes apache/metron#1109
METRON-1674 Create REST endpoint for job status abstraction (merrimanr) closes apache/metron#1109
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/39ae9f46
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/39ae9f46
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/39ae9f46
Branch: refs/heads/master
Commit: 39ae9f4642073d3d4f0fa423339dd97f85974588
Parents: dbbf624
Author: merrimanr <me...@gmail.com>
Authored: Thu Jul 19 11:01:49 2018 -0500
Committer: rmerriman <me...@gmail.com>
Committed: Thu Jul 19 11:01:49 2018 -0500
----------------------------------------------------------------------
.../rest/model/pcap/FixedPcapOptions.java | 42 ++++
.../rest/model/pcap/FixedPcapRequest.java | 72 ++++--
.../metron/rest/model/pcap/PcapRequest.java | 65 +++---
.../metron/rest/model/pcap/PcapStatus.java | 91 ++++++++
.../apache/metron/rest/MetronRestConstants.java | 6 +-
.../apache/metron/rest/config/PcapConfig.java | 14 +-
.../metron/rest/config/PcapJobSupplier.java | 54 +++++
.../metron/rest/controller/PcapController.java | 34 +--
.../apache/metron/rest/service/PcapService.java | 6 +-
.../rest/service/impl/PcapServiceImpl.java | 126 ++++++-----
.../src/main/resources/application.yml | 6 +-
.../apache/metron/rest/config/TestConfig.java | 17 +-
.../PcapControllerIntegrationTest.java | 127 ++++++++++-
.../apache/metron/rest/mock/MockPcapJob.java | 106 ++++++---
.../metron/rest/mock/MockPcapJobSupplier.java | 36 +++
.../rest/service/impl/PcapServiceImplTest.java | 217 +++++++++++++------
.../common/configuration/ConfigOption.java | 12 +-
.../apache/metron/job/JobNotFoundException.java | 30 +++
.../apache/metron/job/RuntimeJobException.java | 30 +++
.../metron/job/manager/InMemoryJobManager.java | 11 +-
.../org/apache/metron/pcap/query/PcapCli.java | 12 -
.../PcapTopologyIntegrationTest.java | 6 +-
.../apache/metron/pcap/query/PcapCliTest.java | 2 -
.../apache/metron/pcap/config/PcapOptions.java | 2 +
.../metron/pcap/finalizer/PcapCliFinalizer.java | 4 +-
.../metron/pcap/finalizer/PcapFinalizer.java | 8 +-
.../pcap/finalizer/PcapRestFinalizer.java | 22 +-
.../java/org/apache/metron/pcap/mr/PcapJob.java | 18 +-
28 files changed, 882 insertions(+), 294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java
new file mode 100644
index 0000000..5e77005
--- /dev/null
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapOptions.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.model.pcap;
+
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum FixedPcapOptions implements ConfigOption {
+ IP_SRC_ADDR("ipSrcAddr"),
+ IP_DST_ADDR("ipDstAddr"),
+ IP_SRC_PORT("ipSrcPort"),
+ IP_DST_PORT("ipDstPort"),
+ PROTOCOL("protocol"),
+ PACKET_FILTER("packetFilter"),
+ INCLUDE_REVERSE("includeReverse")
+ ;
+
+ String key;
+
+ FixedPcapOptions(String key) {
+ this.key = key;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
index 758340b..a2d345b 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/FixedPcapRequest.java
@@ -17,70 +17,100 @@
*/
package org.apache.metron.rest.model.pcap;
+import org.apache.metron.common.Constants;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+
public class FixedPcapRequest extends PcapRequest {
- private String ipSrcAddr;
- private String ipDstAddr;
- private Integer ipSrcPort;
- private Integer ipDstPort;
- private String protocol;
- private String packetFilter;
- private Boolean includeReverse = false;
+ public FixedPcapRequest() {
+ PcapOptions.FILTER_IMPL.put(this, new FixedPcapFilter.Configurator());
+ }
public String getIpSrcAddr() {
- return ipSrcAddr;
+ return FixedPcapOptions.IP_SRC_ADDR.get(this, String.class);
}
public void setIpSrcAddr(String ipSrcAddr) {
- this.ipSrcAddr = ipSrcAddr;
+ FixedPcapOptions.IP_SRC_ADDR.put(this, ipSrcAddr);
}
public String getIpDstAddr() {
- return ipDstAddr;
+ return FixedPcapOptions.IP_DST_ADDR.get(this, String.class);
}
public void setIpDstAddr(String ipDstAddr) {
- this.ipDstAddr = ipDstAddr;
+ FixedPcapOptions.IP_DST_ADDR.put(this, ipDstAddr);
}
public Integer getIpSrcPort() {
- return ipSrcPort;
+ return FixedPcapOptions.IP_SRC_PORT.get(this, Integer.class);
}
public void setIpSrcPort(Integer ipSrcPort) {
- this.ipSrcPort = ipSrcPort;
+ FixedPcapOptions.IP_SRC_PORT.put(this, ipSrcPort);
}
public Integer getIpDstPort() {
- return ipDstPort;
+ return FixedPcapOptions.IP_DST_PORT.get(this, Integer.class);
}
public void setIpDstPort(Integer ipDstPort) {
- this.ipDstPort = ipDstPort;
+ FixedPcapOptions.IP_DST_PORT.put(this, ipDstPort);
}
public String getProtocol() {
- return protocol;
+ return FixedPcapOptions.PROTOCOL.get(this, String.class);
}
public void setProtocol(String protocol) {
- this.protocol = protocol;
+ FixedPcapOptions.PROTOCOL.put(this, protocol);
}
public String getPacketFilter() {
- return packetFilter;
+ return FixedPcapOptions.PACKET_FILTER.get(this, String.class);
}
public void setPacketFilter(String packetFilter) {
- this.packetFilter = packetFilter;
+ FixedPcapOptions.PACKET_FILTER.put(this, packetFilter);
}
public Boolean getIncludeReverse() {
- return includeReverse;
+ return FixedPcapOptions.INCLUDE_REVERSE.get(this, Boolean.class);
}
public void setIncludeReverse(Boolean includeReverse) {
- this.includeReverse = includeReverse;
+ FixedPcapOptions.INCLUDE_REVERSE.put(this, includeReverse);
+ }
+
+ public void setFields() {
+ Map<String, String> fields = new HashMap<>();
+ if (getIpSrcAddr() != null) {
+ fields.put(Constants.Fields.SRC_ADDR.getName(), getIpSrcAddr());
+ }
+ if (getIpDstAddr() != null) {
+ fields.put(Constants.Fields.DST_ADDR.getName(), getIpDstAddr());
+ }
+ if (getIpSrcPort() != null) {
+ fields.put(Constants.Fields.SRC_PORT.getName(), getIpSrcPort().toString());
+ }
+ if (getIpDstPort() != null) {
+ fields.put(Constants.Fields.DST_PORT.getName(), getIpDstPort().toString());
+ }
+ if (getProtocol() != null) {
+ fields.put(Constants.Fields.PROTOCOL.getName(), getProtocol());
+ }
+ if (getIncludeReverse() != null) {
+ fields.put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), getIncludeReverse().toString());
+ }
+ if (getPacketFilter() != null) {
+ fields.put(PcapHelper.PacketFields.PACKET_FILTER.getName(), getPacketFilter());
+ }
+ PcapOptions.FIELDS.put(this, fields);
}
@Override
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
index 5941d17..64ed932 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapRequest.java
@@ -17,48 +17,57 @@
*/
package org.apache.metron.rest.model.pcap;
-// TODO reconcile with pcapmrjob
-
import org.apache.commons.collections4.map.AbstractMapDecorator;
import org.apache.metron.pcap.config.PcapOptions;
+import java.util.HashMap;
+
public class PcapRequest extends AbstractMapDecorator<String, Object> {
public PcapRequest() {
- setStartTime(0L);
- setEndTime(System.currentTimeMillis());
- setNumReducers(1);
+ super(new HashMap<>());
+ setStartTimeMs(0L);
+ setEndTimeMs(System.currentTimeMillis());
+ setNumReducers(10);
+ }
+
+ public String getBasePath() {
+ return PcapOptions.BASE_PATH.get(this, String.class);
+ }
+
+ public void setBasePath(String basePath) {
+ PcapOptions.BASE_PATH.put(this, basePath);
}
- public String getBaseOutputPath() {
+ public String getBaseInterimResultPath() {
return PcapOptions.BASE_INTERIM_RESULT_PATH.get(this, String.class);
}
- public void setBaseOutputPath(String baseOutputPath) {
- PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseOutputPath);
+ public void setBaseInterimResultPath(String baseInterimResultPath) {
+ PcapOptions.BASE_INTERIM_RESULT_PATH.put(this, baseInterimResultPath);
}
- public String getBasePath() {
- return PcapOptions.BASE_PATH.get(this, String.class);
+ public String getFinalOutputPath() {
+ return PcapOptions.FINAL_OUTPUT_PATH.get(this, String.class);
}
- public void setBasePath(String basePath) {
- PcapOptions.BASE_PATH.put(this, basePath);
+ public void setFinalOutputPath(String finalOutputPath) {
+ PcapOptions.FINAL_OUTPUT_PATH.put(this, finalOutputPath);
}
- public Long getStartTime() {
+ public Long getStartTimeMs() {
return PcapOptions.START_TIME_MS.get(this, Long.class);
}
- public void setStartTime(Long startTime) {
+ public void setStartTimeMs(Long startTime) {
PcapOptions.START_TIME_MS.put(this, startTime);
}
- public Long getEndTime() {
+ public Long getEndTimeMs() {
return PcapOptions.END_TIME_MS.get(this, Long.class);
}
- public void setEndTime(Long endTime) {
+ public void setEndTimeMs(Long endTime) {
PcapOptions.END_TIME_MS.put(this, endTime);
}
@@ -69,28 +78,4 @@ public class PcapRequest extends AbstractMapDecorator<String, Object> {
public void setNumReducers(Integer numReducers) {
PcapOptions.NUM_REDUCERS.put(this, numReducers);
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- PcapRequest pcapRequest = (PcapRequest) o;
-
- return (getBaseOutputPath() != null ? getBaseOutputPath().equals(pcapRequest.getBaseOutputPath()) : pcapRequest.getBaseOutputPath() != null) &&
- (getBasePath() != null ? getBasePath().equals(pcapRequest.getBasePath()) : pcapRequest.getBasePath() == null) &&
- (getStartTime() != null ? getStartTime().equals(pcapRequest.getStartTime()) : pcapRequest.getStartTime() == null) &&
- (getEndTime() != null ? getEndTime().equals(pcapRequest.getEndTime()) : pcapRequest.getEndTime() == null) &&
- (getNumReducers() != null ? getNumReducers().equals(pcapRequest.getNumReducers()) : pcapRequest.getNumReducers() == null);
- }
-
- @Override
- public int hashCode() {
- int result = getBaseOutputPath() != null ? getBaseOutputPath().hashCode() : 0;
- result = 31 * result + (getBasePath() != null ? getBasePath().hashCode() : 0);
- result = 31 * result + (getStartTime() != null ? getStartTime().hashCode() : 0);
- result = 31 * result + (getEndTime() != null ? getEndTime().hashCode() : 0);
- result = 31 * result + (getNumReducers() != null ? getNumReducers().hashCode() : 0);
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java
new file mode 100644
index 0000000..f004eb5
--- /dev/null
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/pcap/PcapStatus.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.model.pcap;
+
+public class PcapStatus {
+
+ private String jobId;
+ private String jobStatus;
+ private String description;
+ private Double percentComplete = 0.0;
+ private Integer pageTotal = 0;
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobStatus() {
+ return jobStatus;
+ }
+
+ public void setJobStatus(String jobStatus) {
+ this.jobStatus = jobStatus;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public Double getPercentComplete() {
+ return percentComplete;
+ }
+
+ public void setPercentComplete(Double percentComplete) {
+ this.percentComplete = percentComplete;
+ }
+
+ public Integer getPageTotal() {
+ return pageTotal;
+ }
+
+ public void setPageTotal(Integer size) {
+ this.pageTotal = size;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PcapStatus pcapStatus = (PcapStatus) o;
+
+ return (getJobId() != null ? getJobId().equals(pcapStatus.getJobId()) : pcapStatus.getJobId() != null) &&
+ (getJobStatus() != null ? getJobStatus().equals(pcapStatus.getJobStatus()) : pcapStatus.getJobStatus() != null) &&
+ (getDescription() != null ? getDescription().equals(pcapStatus.getDescription()) : pcapStatus.getDescription() != null) &&
+ (getPercentComplete() != null ? getPercentComplete().equals(pcapStatus.getPercentComplete()) : pcapStatus.getPercentComplete() != null) &&
+ (getPageTotal() != null ? getPageTotal().equals(pcapStatus.getPageTotal()) : pcapStatus.getPageTotal() != null);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (getJobId() != null ? getJobId().hashCode() : 0);
+ result = 31 * result + (getJobStatus() != null ? getJobStatus().hashCode() : 0);
+ result = 31 * result + (getDescription() != null ? getDescription().hashCode() : 0);
+ result = 31 * result + (getPercentComplete() != null ? getPercentComplete().hashCode() : 0);
+ result = 31 * result + (getPageTotal() != null ? getPageTotal().hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index 0989d12..8e14e38 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -75,6 +75,8 @@ public class MetronRestConstants {
public static final String LOGGING_SYSTEM_PROPERTY = "org.springframework.boot.logging.LoggingSystem";
- public static final String PCAP_INPUT_PATH_SPRING_PROPERTY = "pcap.input.path";
- public static final String PCAP_OUTPUT_PATH_SPRING_PROPERTY = "pcap.output.path";
+ public static final String PCAP_BASE_PATH_SPRING_PROPERTY = "pcap.base.path";
+ public static final String PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY = "pcap.base.interim.result.path";
+ public static final String PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY = "pcap.final.output.path";
+ public static final String PCAP_PAGE_SIZE_SPRING_PROPERTY = "pcap.page.size";
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
index 8da5f96..a0b7f18 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapConfig.java
@@ -17,7 +17,8 @@
*/
package org.apache.metron.rest.config;
-import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.job.manager.InMemoryJobManager;
+import org.apache.metron.job.manager.JobManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@@ -29,7 +30,14 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
public class PcapConfig {
@Bean
- public PcapJob pcapJob() {
- return new PcapJob();
+ public JobManager jobManager() {
+ return new InMemoryJobManager();
}
+
+ @Bean
+ public PcapJobSupplier pcapJobSupplier() {
+ return new PcapJobSupplier();
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
new file mode 100644
index 0000000..1e79f6a
--- /dev/null
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/PcapJobSupplier.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.config;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.RuntimeJobException;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.finalizer.PcapFinalizerStrategies;
+import org.apache.metron.pcap.finalizer.PcapRestFinalizer;
+import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.rest.model.pcap.PcapRequest;
+
+import java.util.function.Supplier;
+
+public class PcapJobSupplier implements Supplier<Statusable<Path>> {
+
+ private PcapRequest pcapRequest;
+
+ @Override
+ public Statusable<Path> get() {
+ try {
+ PcapJob<Path> pcapJob = createPcapJob();
+ return pcapJob.submit(PcapFinalizerStrategies.REST, pcapRequest);
+ } catch (JobException e) {
+ throw new RuntimeJobException(e.getMessage());
+ }
+ }
+
+ public void setPcapRequest(PcapRequest pcapRequest) {
+ this.pcapRequest = pcapRequest;
+ }
+
+ protected PcapJob createPcapJob() {
+ return new PcapJob();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
index 3524a8c..38bffb4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/PcapController.java
@@ -21,10 +21,14 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import org.apache.hadoop.fs.Path;
import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Statusable;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.PcapResponse;
import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
+import org.apache.metron.rest.security.SecurityUtils;
import org.apache.metron.rest.service.PcapService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
@@ -33,8 +37,12 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+import java.util.Set;
+
@RestController
@RequestMapping("/api/v1/pcap")
public class PcapController {
@@ -45,27 +53,23 @@ public class PcapController {
@ApiOperation(value = "Executes a Fixed Pcap Query.")
@ApiResponses(value = { @ApiResponse(message = "Returns a job status with job ID.", code = 200)})
@RequestMapping(value = "/fixed", method = RequestMethod.POST)
- ResponseEntity<JobStatus> fixed(@ApiParam(name="fixedPcapRequest", value="A Fixed Pcap Request"
+ ResponseEntity<PcapStatus> fixed(@ApiParam(name="fixedPcapRequest", value="A Fixed Pcap Request"
+ " which includes fixed filter fields like ip source address and protocol.", required=true)@RequestBody FixedPcapRequest fixedPcapRequest) throws RestException {
- JobStatus jobStatus = pcapQueryService.fixed(fixedPcapRequest);
- return new ResponseEntity<>(jobStatus, HttpStatus.OK);
+ PcapStatus pcapStatus = pcapQueryService.fixed(SecurityUtils.getCurrentUser(), fixedPcapRequest);
+ return new ResponseEntity<>(pcapStatus, HttpStatus.OK);
}
@ApiOperation(value = "Gets job status for running job.")
@ApiResponses(value = { @ApiResponse(message = "Returns a job status for the passed job.", code = 200)})
- @RequestMapping(value = "/getStatus", method = RequestMethod.GET)
- ResponseEntity<JobStatus> getStatus(@ApiParam(name="jobId", value="Job ID of submitted job"
+ @RequestMapping(value = "/{jobId}", method = RequestMethod.GET)
+ ResponseEntity<PcapStatus> getStatus(@ApiParam(name="jobId", value="Job ID of submitted job"
+ " which includes fixed filter fields like ip source address and protocol.", required=true)@PathVariable String jobId) throws RestException {
- JobStatus jobStatus = pcapQueryService.getJobStatus("metron", jobId);
- return new ResponseEntity<>(jobStatus, HttpStatus.OK);
- }
+ PcapStatus jobStatus = pcapQueryService.getJobStatus(SecurityUtils.getCurrentUser(), jobId);
+ if (jobStatus != null) {
+ return new ResponseEntity<>(jobStatus, HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>(HttpStatus.NOT_FOUND);
+ }
- @ApiOperation(value = "Gets results of a pcap job.")
- @ApiResponses(value = { @ApiResponse(message = "Returns a PcapResponse containing an array of pcaps.", code = 200)})
- @RequestMapping(value = "/getPage", method = RequestMethod.GET)
- ResponseEntity<PcapResponse> getPage(@ApiParam(name="fixedPcapRequest", value="Job ID of submitted job"
- + " which includes fixed filter fields like ip source address and protocol.", required=true)@RequestBody String jobId, int pageNum) throws RestException {
- PcapResponse pcapResponse = pcapQueryService.getPage("metron", jobId, pageNum);
- return new ResponseEntity<>(pcapResponse, HttpStatus.OK);
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
index ce8372c..603e013 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/PcapService.java
@@ -18,10 +18,12 @@
package org.apache.metron.rest.service;
import org.apache.metron.rest.RestException;
-import org.apache.metron.rest.model.PcapResponse;
import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
public interface PcapService {
- PcapResponse fixed(FixedPcapRequest fixedPcapRequest) throws RestException;
+ PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException;
+
+ PcapStatus getJobStatus(String username, String jobId) throws RestException;
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
index 4dae1e5..218e9be 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/PcapServiceImpl.java
@@ -20,101 +20,107 @@ package org.apache.metron.rest.service.impl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.hadoop.SequenceFileIterable;
-import org.apache.metron.common.utils.timestamp.TimestampConverters;
-import org.apache.metron.pcap.PcapHelper;
-import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
-import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobNotFoundException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.job.manager.JobManager;
+import org.apache.metron.pcap.config.PcapOptions;
+import org.apache.metron.pcap.finalizer.PcapRestFinalizer;
import org.apache.metron.rest.MetronRestConstants;
import org.apache.metron.rest.RestException;
-import org.apache.metron.rest.model.PcapResponse;
+import org.apache.metron.rest.config.PcapJobSupplier;
import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
import org.apache.metron.rest.service.PcapService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
@Service
public class PcapServiceImpl implements PcapService {
private Environment environment;
private Configuration configuration;
- private PcapJob pcapJob;
+ private PcapJobSupplier pcapJobSupplier;
+ private JobManager<Path> jobManager;
@Autowired
- public PcapServiceImpl(Environment environment, Configuration configuration, PcapJob pcapJob) {
+ public PcapServiceImpl(Environment environment, Configuration configuration, PcapJobSupplier pcapJobSupplier, JobManager<Path> jobManager) {
this.environment = environment;
this.configuration = configuration;
- this.pcapJob = pcapJob;
+ this.pcapJobSupplier = pcapJobSupplier;
+ this.jobManager = jobManager;
}
@Override
- public PcapResponse fixed(FixedPcapRequest fixedPcapRequest) throws RestException {
- if (fixedPcapRequest.getBasePath() == null) {
- fixedPcapRequest.setBasePath(environment.getProperty(MetronRestConstants.PCAP_INPUT_PATH_SPRING_PROPERTY));
- }
- if (fixedPcapRequest.getBaseOutputPath() == null) {
- fixedPcapRequest.setBaseOutputPath(environment.getProperty(MetronRestConstants.PCAP_OUTPUT_PATH_SPRING_PROPERTY));
+ public PcapStatus fixed(String username, FixedPcapRequest fixedPcapRequest) throws RestException {
+ try {
+ setPcapOptions(username, fixedPcapRequest);
+ fixedPcapRequest.setFields();
+ pcapJobSupplier.setPcapRequest(fixedPcapRequest);
+ JobStatus jobStatus = jobManager.submit(pcapJobSupplier, username);
+ return jobStatusToPcapStatus(jobStatus);
+ } catch (IOException | JobException e) {
+ throw new RestException(e);
}
- PcapResponse response = new PcapResponse();
- SequenceFileIterable results;
+ }
+
+ @Override
+ public PcapStatus getJobStatus(String username, String jobId) throws RestException {
+ PcapStatus pcapStatus = null;
try {
- results = pcapJob.query(
- new Path(fixedPcapRequest.getBasePath()),
- new Path(fixedPcapRequest.getBaseOutputPath()),
- TimestampConverters.MILLISECONDS.toNanoseconds(fixedPcapRequest.getStartTime()),
- TimestampConverters.MILLISECONDS.toNanoseconds(fixedPcapRequest.getEndTime()),
- fixedPcapRequest.getNumReducers(),
- getFixedFields(fixedPcapRequest),
- configuration,
- getFileSystem(),
- new FixedPcapFilter.Configurator()
- );
- if (results != null) {
- List<byte[]> pcaps = new ArrayList<>();
- results.iterator().forEachRemaining(pcaps::add);
- response.setPcaps(pcaps);
+ Statusable<Path> statusable = jobManager.getJob(username, jobId);
+ if (statusable != null) {
+ pcapStatus = jobStatusToPcapStatus(statusable.getStatus());
+ if (statusable.isDone()) {
+ Pageable<Path> pageable = statusable.get();
+ if (pageable != null) {
+ pcapStatus.setPageTotal(pageable.getSize());
+ }
+ }
}
- } catch (IOException | ClassNotFoundException | InterruptedException e) {
+ } catch (JobNotFoundException | InterruptedException e) {
+ // do nothing and return null pcapStatus
+ } catch (JobException e) {
throw new RestException(e);
}
- return response;
+ return pcapStatus;
}
- protected Map<String, String> getFixedFields(FixedPcapRequest fixedPcapRequest) {
- Map<String, String> fixedFields = new HashMap<>();
- if (fixedPcapRequest.getIpSrcAddr() != null) {
- fixedFields.put(Constants.Fields.SRC_ADDR.getName(), fixedPcapRequest.getIpSrcAddr());
- }
- if (fixedPcapRequest.getIpDstAddr() != null) {
- fixedFields.put(Constants.Fields.DST_ADDR.getName(), fixedPcapRequest.getIpDstAddr());
- }
- if (fixedPcapRequest.getIpSrcPort() != null) {
- fixedFields.put(Constants.Fields.SRC_PORT.getName(), fixedPcapRequest.getIpSrcPort().toString());
- }
- if (fixedPcapRequest.getIpDstPort() != null) {
- fixedFields.put(Constants.Fields.DST_PORT.getName(), fixedPcapRequest.getIpDstPort().toString());
- }
- if (fixedPcapRequest.getProtocol() != null) {
- fixedFields.put(Constants.Fields.PROTOCOL.getName(), fixedPcapRequest.getProtocol());
+ protected void setPcapOptions(String username, PcapRequest pcapRequest) throws IOException {
+ PcapOptions.JOB_NAME.put(pcapRequest, "jobName");
+ PcapOptions.USERNAME.put(pcapRequest, username);
+ PcapOptions.HADOOP_CONF.put(pcapRequest, configuration);
+ PcapOptions.FILESYSTEM.put(pcapRequest, getFileSystem());
+
+ if (pcapRequest.getBasePath() == null) {
+ pcapRequest.setBasePath(environment.getProperty(MetronRestConstants.PCAP_BASE_PATH_SPRING_PROPERTY));
}
- if (fixedPcapRequest.getIncludeReverse() != null) {
- fixedFields.put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), fixedPcapRequest.getIncludeReverse().toString());
+ if (pcapRequest.getBaseInterimResultPath() == null) {
+ pcapRequest.setBaseInterimResultPath(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY));
}
- if (fixedPcapRequest.getPacketFilter() != null) {
- fixedFields.put(PcapHelper.PacketFields.PACKET_FILTER.getName(), fixedPcapRequest.getPacketFilter());
+ if (pcapRequest.getFinalOutputPath() == null) {
+ pcapRequest.setFinalOutputPath(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY));
}
- return fixedFields;
+
+ PcapOptions.NUM_RECORDS_PER_FILE.put(pcapRequest, Integer.parseInt(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)));
}
protected FileSystem getFileSystem() throws IOException {
return FileSystem.get(configuration);
}
+
+ protected PcapStatus jobStatusToPcapStatus(JobStatus jobStatus) {
+ PcapStatus pcapStatus = new PcapStatus();
+ pcapStatus.setJobId(jobStatus.getJobId());
+ pcapStatus.setJobStatus(jobStatus.getState().toString());
+ pcapStatus.setDescription(jobStatus.getDescription());
+ pcapStatus.setPercentComplete(jobStatus.getPercentComplete());
+ return pcapStatus;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/main/resources/application.yml
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml
index 10c2f50..5fd9d72 100644
--- a/metron-interface/metron-rest/src/main/resources/application.yml
+++ b/metron-interface/metron-rest/src/main/resources/application.yml
@@ -74,5 +74,7 @@ user:
cf: cf
pcap:
- input.path: /apps/metron/pcap
- output.path: /tmp
\ No newline at end of file
+ base.path: /apps/metron/pcap/input
+ base.interim.result.path: /apps/metron/pcap/interim
+ final.output.path: /apps/metron/pcap/output
+ page.size: 10
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index a9e70d2..486a7dc 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -37,9 +37,12 @@ import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.integration.components.KafkaComponent;
import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.job.manager.InMemoryJobManager;
+import org.apache.metron.job.manager.JobManager;
import org.apache.metron.pcap.mr.PcapJob;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.mock.MockPcapJob;
+import org.apache.metron.rest.mock.MockPcapJobSupplier;
import org.apache.metron.rest.mock.MockStormCLIClientWrapper;
import org.apache.metron.rest.mock.MockStormRestTemplate;
import org.apache.metron.rest.service.impl.StormCLIWrapper;
@@ -189,7 +192,19 @@ public class TestConfig {
}
@Bean
- public PcapJob mockPcapJob() {
+ public JobManager jobManager() {
+ return new InMemoryJobManager();
+ }
+
+ @Bean
+ public MockPcapJob mockPcapJob() {
return new MockPcapJob();
}
+
+ @Bean
+ public PcapJobSupplier pcapJobSupplier(MockPcapJob mockPcapJob) {
+ MockPcapJobSupplier mockPcapJobSupplier = new MockPcapJobSupplier();
+ mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
+ return mockPcapJobSupplier;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
index 5e4875a..462d83d 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/PcapControllerIntegrationTest.java
@@ -18,9 +18,13 @@
package org.apache.metron.rest.controller;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.hadoop.fs.Path;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.PcapPages;
import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
import org.apache.metron.rest.mock.MockPcapJob;
import org.apache.metron.rest.model.PcapResponse;
@@ -43,11 +47,14 @@ import java.util.List;
import java.util.Map;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.hamcrest.Matchers.hasSize;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@RunWith(SpringRunner.class)
@@ -57,23 +64,38 @@ public class PcapControllerIntegrationTest {
/**
{
- "basePath": "/apps/metron/pcap",
- "baseOutputPath": "/tmp",
- "endTime": 10,
+ "basePath": "/base/path",
+ "baseInterimResultPath": "/base/interim/result/path",
+ "finalOutputPath": "/final/output/path",
+ "startTimeMs": 10,
+ "endTimeMs": 20,
+ "numReducers": 2,
"includeReverse": "true",
"ipDstAddr": "192.168.1.1",
"ipDstPort": "1000",
"ipSrcAddr": "192.168.1.2",
"ipSrcPort": "2000",
- "numReducers": 2,
"packetFilter": "filter",
- "protocol": "TCP",
- "startTime": 1
+ "protocol": "TCP"
}
*/
@Multiline
public static String fixedJson;
+ /**
+ {
+ "includeReverse": "true",
+ "ipDstAddr": "192.168.1.1",
+ "ipDstPort": "1000",
+ "ipSrcAddr": "192.168.1.2",
+ "ipSrcPort": "2000",
+ "packetFilter": "filter",
+ "protocol": "TCP"
+ }
+ */
+ @Multiline
+ public static String fixedWithDefaultsJson;
+
@Autowired
private PcapService pcapService;
@@ -84,6 +106,7 @@ public class PcapControllerIntegrationTest {
private String pcapUrl = "/api/v1/pcap";
private String user = "user";
+ private String user2 = "user2";
private String password = "password";
@Before
@@ -98,22 +121,24 @@ public class PcapControllerIntegrationTest {
}
@Test
- public void testFixed() throws Exception {
+ public void testFixedRequest() throws Exception {
MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob");
List<byte[]> results = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes());
mockPcapJob.setResults(results);
+ mockPcapJob.setStatus(new JobStatus().withState(JobStatus.State.RUNNING));
PcapResponse expectedReponse = new PcapResponse();
expectedReponse.setPcaps(results);
this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(content().json(JSONUtils.INSTANCE.toJSON(expectedReponse, false)));
+ .andExpect(jsonPath("$.jobStatus").value("RUNNING"));
- Assert.assertEquals("/apps/metron/pcap", mockPcapJob.getBasePath());
- Assert.assertEquals("/tmp", mockPcapJob.getBaseOutputPath());
- Assert.assertEquals(1, mockPcapJob.getStartTime());
- Assert.assertEquals(10, mockPcapJob.getEndTime());
+ Assert.assertEquals("/base/path", mockPcapJob.getBasePath());
+ Assert.assertEquals("/base/interim/result/path", mockPcapJob.getBaseInterrimResultPath());
+ Assert.assertEquals("/final/output/path", mockPcapJob.getFinalOutputPath());
+ Assert.assertEquals(10000000, mockPcapJob.getStartTimeNs());
+ Assert.assertEquals(20000000, mockPcapJob.getEndTimeNs());
Assert.assertEquals(2, mockPcapJob.getNumReducers());
Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
@@ -124,6 +149,84 @@ public class PcapControllerIntegrationTest {
Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()));
Assert.assertEquals("TCP", actualFixedFields.get(Constants.Fields.PROTOCOL.getName()));
Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
+ }
+
+
+ @Test
+ public void testFixedRequestDefaults() throws Exception {
+ MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob");
+ mockPcapJob.setStatus(new JobStatus().withState(JobStatus.State.RUNNING));
+ long beforeJobTime = System.currentTimeMillis();
+
+ this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedWithDefaultsJson))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.jobStatus").value("RUNNING"));
+
+ Assert.assertEquals("/apps/metron/pcap/input", mockPcapJob.getBasePath());
+ Assert.assertEquals("/apps/metron/pcap/interim", mockPcapJob.getBaseInterrimResultPath());
+ Assert.assertEquals("/apps/metron/pcap/output", mockPcapJob.getFinalOutputPath());
+ Assert.assertEquals(0, mockPcapJob.getStartTimeNs());
+ Assert.assertTrue(beforeJobTime < mockPcapJob.getEndTimeNs() / 1000000);
+ Assert.assertTrue(System.currentTimeMillis() > mockPcapJob.getEndTimeNs() / 1000000);
+ Assert.assertEquals(10, mockPcapJob.getNumReducers());
+ Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
+ Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
+ Assert.assertEquals("192.168.1.2", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));
+ Assert.assertEquals("2000", actualFixedFields.get(Constants.Fields.SRC_PORT.getName()));
+ Assert.assertEquals("192.168.1.1", actualFixedFields.get(Constants.Fields.DST_ADDR.getName()));
+ Assert.assertEquals("1000", actualFixedFields.get(Constants.Fields.DST_PORT.getName()));
+ Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()));
+ Assert.assertEquals("TCP", actualFixedFields.get(Constants.Fields.PROTOCOL.getName()));
+ Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
+ }
+
+ @Test
+ public void testGetStatus() throws Exception {
+ MockPcapJob mockPcapJob = (MockPcapJob) wac.getBean("mockPcapJob");
+
+ this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+ .andExpect(status().isNotFound());
+
+ mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.RUNNING));
+
+ this.mockMvc.perform(post(pcapUrl + "/fixed").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(fixedJson))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.jobId").value("jobId"))
+ .andExpect(jsonPath("$.jobStatus").value("RUNNING"));
+
+ mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.SUCCEEDED));
+
+ Pageable<Path> pageable = new PcapPages(Arrays.asList(new Path("path1"), new Path("path1")));
+ mockPcapJob.setIsDone(true);
+ mockPcapJob.setPageable(pageable);
+ this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.jobStatus").value("SUCCEEDED"))
+ .andExpect(jsonPath("$.pageTotal").value(2));
+
+ mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.FINALIZING));
+
+ this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.jobStatus").value("FINALIZING"));
+
+ mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.FAILED));
+
+ this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.jobStatus").value("FAILED"));
+
+ mockPcapJob.setStatus(new JobStatus().withJobId("jobId").withState(JobStatus.State.KILLED));
+
+ this.mockMvc.perform(get(pcapUrl + "/jobId").with(httpBasic(user, password)))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.jobStatus").value("KILLED"));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
index a7eca31..df65635 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJob.java
@@ -17,47 +17,79 @@
*/
package org.apache.metron.rest.mock;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.metron.common.hadoop.SequenceFileIterable;
+import org.apache.metron.job.Finalizer;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.Statusable;
+import org.apache.metron.pcap.config.PcapOptions;
import org.apache.metron.pcap.filter.PcapFilterConfigurator;
import org.apache.metron.pcap.mr.PcapJob;
-public class MockPcapJob extends PcapJob {
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MockPcapJob extends PcapJob<Path> {
private String basePath;
- private String baseOutputPath;
- private long beginNS;
- private long endNS;
+ private String baseInterrimResultPath;
+ private String finalOutputPath;
+ private long startTimeNs;
+ private long endTimeNs;
private int numReducers;
private Map<String, String> fixedFields;
private PcapFilterConfigurator filterImpl;
+ private int recPerFile;
private SequenceFileIterable sequenceFileIterable;
+ private Statusable<Path> statusable;
public MockPcapJob() {
sequenceFileIterable = mock(SequenceFileIterable.class);
+ statusable = mock(Statusable.class);
}
- @SuppressWarnings(value = "unchecked")
@Override
- public <T> SequenceFileIterable query(Path basePath, Path baseOutputPath, long beginNS, long endNS, int numReducers, T fields, Configuration conf, FileSystem fs, PcapFilterConfigurator<T> filterImpl) throws IOException, ClassNotFoundException, InterruptedException {
- this.basePath = basePath.toString();
- this.baseOutputPath = baseOutputPath.toString();
- this.beginNS = beginNS;
- this.endNS = endNS;
- this.numReducers = numReducers;
+ public Statusable<Path> submit(Finalizer<Path> finalizer, Map<String, Object> configuration) throws JobException {
+ this.basePath = PcapOptions.BASE_PATH.get(configuration, String.class);
+ this.baseInterrimResultPath = PcapOptions.BASE_INTERIM_RESULT_PATH.get(configuration, String.class);
+ this.finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(configuration, String.class);
+ this.startTimeNs = PcapOptions.START_TIME_MS.get(configuration, Long.class) * 1000000;
+ this.endTimeNs = PcapOptions.END_TIME_MS.get(configuration, Long.class) * 1000000;
+ this.numReducers = PcapOptions.NUM_REDUCERS.get(configuration, Integer.class);
+ Object fields = PcapOptions.FIELDS.get(configuration, Object.class);
if (fields instanceof Map) {
this.fixedFields = (Map<String, String>) fields;
}
- this.filterImpl = filterImpl;
- return sequenceFileIterable;
+ this.filterImpl = PcapOptions.FILTER_IMPL.get(configuration, PcapFilterConfigurator.class);
+ this.recPerFile = PcapOptions.NUM_RECORDS_PER_FILE.get(configuration, Integer.class);
+ return statusable;
+ }
+
+ @Override
+ public JobStatus getStatus() throws JobException {
+ return statusable.getStatus();
+ }
+
+ @Override
+ public Pageable<Path> get() throws JobException, InterruptedException {
+ return statusable.get();
+ }
+
+ public void setStatus(JobStatus jobStatus) throws JobException {
+ when(statusable.getStatus()).thenReturn(jobStatus);
+ }
+
+ public void setPageable(Pageable<Path> pageable) throws JobException, InterruptedException {
+ when(statusable.get()).thenReturn(pageable);
+ }
+
+ public void setIsDone(boolean isDone) {
+ when(statusable.isDone()).thenReturn(isDone);
}
public void setResults(List<byte[]> pcaps) {
@@ -68,16 +100,32 @@ public class MockPcapJob extends PcapJob {
return basePath;
}
- public String getBaseOutputPath() {
- return baseOutputPath;
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+
+ public String getBaseInterrimResultPath() {
+ return baseInterrimResultPath;
}
- public long getStartTime() {
- return beginNS / 1000000;
+ public void setBaseInterrimResultPath(String baseInterrimResultPath) {
+ this.baseInterrimResultPath = baseInterrimResultPath;
}
- public long getEndTime() {
- return endNS / 1000000;
+ public String getFinalOutputPath() {
+ return finalOutputPath;
+ }
+
+ public void setFinalOutputPath(String finalOutputPath) {
+ this.finalOutputPath = finalOutputPath;
+ }
+
+ public long getStartTimeNs() {
+ return startTimeNs;
+ }
+
+ public long getEndTimeNs() {
+ return endTimeNs;
}
public int getNumReducers() {
@@ -91,4 +139,8 @@ public class MockPcapJob extends PcapJob {
public PcapFilterConfigurator getFilterImpl() {
return filterImpl;
}
+
+ public int getRecPerFile() {
+ return recPerFile;
+ }
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java
new file mode 100644
index 0000000..9a1ac7f
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/mock/MockPcapJobSupplier.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.rest.mock;
+
+import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.metron.rest.config.PcapJobSupplier;
+
+public class MockPcapJobSupplier extends PcapJobSupplier {
+
+ private MockPcapJob mockPcapJob = new MockPcapJob();
+
+ @Override
+ protected PcapJob createPcapJob() {
+ return mockPcapJob;
+ }
+
+ public void setMockPcapJob(MockPcapJob mockPcapJob) {
+ this.mockPcapJob = mockPcapJob;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
index 1a11c79..2b6bea3 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/PcapServiceImplTest.java
@@ -17,12 +17,23 @@
*/
package org.apache.metron.rest.service.impl;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.metron.pcap.mr.PcapJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.metron.common.Constants;
+import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobStatus;
+import org.apache.metron.job.Pageable;
+import org.apache.metron.job.manager.InMemoryJobManager;
+import org.apache.metron.job.manager.JobManager;
+import org.apache.metron.pcap.PcapHelper;
+import org.apache.metron.pcap.filter.fixed.FixedPcapFilter;
import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.config.PcapJobSupplier;
+import org.apache.metron.rest.mock.MockPcapJob;
+import org.apache.metron.rest.mock.MockPcapJobSupplier;
+import org.apache.metron.rest.model.pcap.FixedPcapRequest;
+import org.apache.metron.rest.model.pcap.PcapStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -30,6 +41,15 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.core.env.Environment;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
@SuppressWarnings("ALL")
public class PcapServiceImplTest {
@Rule
@@ -37,32 +57,28 @@ public class PcapServiceImplTest {
Environment environment;
Configuration configuration;
- PcapJob pcapJob;
+ MockPcapJobSupplier mockPcapJobSupplier;
@Before
public void setUp() throws Exception {
environment = mock(Environment.class);
- pcapJob = mock(PcapJob.class);
configuration = mock(Configuration.class);
+ mockPcapJobSupplier = new MockPcapJobSupplier();
- when(environment.getProperty(MetronRestConstants.PCAP_INPUT_PATH_SPRING_PROPERTY)).thenReturn("/input/path");
- when(environment.getProperty(MetronRestConstants.PCAP_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/output/path");
+ when(environment.getProperty(MetronRestConstants.PCAP_BASE_PATH_SPRING_PROPERTY)).thenReturn("/base/path");
+ when(environment.getProperty(MetronRestConstants.PCAP_BASE_INTERIM_RESULT_PATH_SPRING_PROPERTY)).thenReturn("/base/interim/result/path");
+ when(environment.getProperty(MetronRestConstants.PCAP_FINAL_OUTPUT_PATH_SPRING_PROPERTY)).thenReturn("/final/output/path");
+ when(environment.getProperty(MetronRestConstants.PCAP_PAGE_SIZE_SPRING_PROPERTY)).thenReturn("100");
}
- // TODO
-
- @Test
- public void placeholder() {
- Assert.assertTrue(true);
- }
-/*
@Test
public void fixedShouldProperlyCallPcapJobQuery() throws Exception {
FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
- fixedPcapRequest.setBaseOutputPath("baseOutputPath");
fixedPcapRequest.setBasePath("basePath");
- fixedPcapRequest.setStartTime(1L);
- fixedPcapRequest.setEndTime(2L);
+ fixedPcapRequest.setBaseInterimResultPath("baseOutputPath");
+ fixedPcapRequest.setFinalOutputPath("finalOutputPath");
+ fixedPcapRequest.setStartTimeMs(1L);
+ fixedPcapRequest.setEndTimeMs(2L);
fixedPcapRequest.setNumReducers(2);
fixedPcapRequest.setIpSrcAddr("ip_src_addr");
fixedPcapRequest.setIpDstAddr("ip_dst_addr");
@@ -71,10 +87,19 @@ public class PcapServiceImplTest {
fixedPcapRequest.setProtocol("tcp");
fixedPcapRequest.setPacketFilter("filter");
fixedPcapRequest.setIncludeReverse(true);
+ MockPcapJob mockPcapJob = new MockPcapJob();
+ mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
+ JobManager jobManager = new InMemoryJobManager<>();
- PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob));
+ PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager));
FileSystem fileSystem = mock(FileSystem.class);
doReturn(fileSystem).when(pcapService).getFileSystem();
+ mockPcapJob.setStatus(new JobStatus()
+ .withJobId("jobId")
+ .withDescription("description")
+ .withPercentComplete(0L)
+ .withState(JobStatus.State.RUNNING));
+
Map<String, String> expectedFields = new HashMap<String, String>() {{
put(Constants.Fields.SRC_ADDR.getName(), "ip_src_addr");
put(Constants.Fields.DST_ADDR.getName(), "ip_dst_addr");
@@ -84,72 +109,128 @@ public class PcapServiceImplTest {
put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "true");
put(PcapHelper.PacketFields.PACKET_FILTER.getName(), "filter");
}};
- List<byte[]> expectedPcaps = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes());
- SequenceFileIterable results = mock(SequenceFileIterable.class);
- when(results.iterator()).thenReturn(expectedPcaps.iterator());
- when(pcapJob.query(eq(new Path("basePath")),
- eq(new Path("baseOutputPath")),
- eq(1000000L),
- eq(2000000L),
- eq(2),
- eq(expectedFields),
- eq(configuration),
- any(FileSystem.class),
- any(FixedPcapFilter.Configurator.class))).thenReturn(results);
-
- PcapResponse pcapsResponse = pcapService.fixed(fixedPcapRequest);
- Assert.assertEquals(expectedPcaps, pcapsResponse.getPcaps());
+ PcapStatus expectedPcapStatus = new PcapStatus();
+ expectedPcapStatus.setJobId("jobId");
+ expectedPcapStatus.setJobStatus(JobStatus.State.RUNNING.name());
+ expectedPcapStatus.setDescription("description");
+
+ Assert.assertEquals(expectedPcapStatus, pcapService.fixed("user", fixedPcapRequest));
+ Assert.assertEquals(expectedPcapStatus, pcapService.jobStatusToPcapStatus(jobManager.getJob("user", "jobId").getStatus()));
+ Assert.assertEquals("basePath", mockPcapJob.getBasePath());
+ Assert.assertEquals("baseOutputPath", mockPcapJob.getBaseInterrimResultPath());
+ Assert.assertEquals("finalOutputPath", mockPcapJob.getFinalOutputPath());
+ Assert.assertEquals(1000000, mockPcapJob.getStartTimeNs());
+ Assert.assertEquals(2000000, mockPcapJob.getEndTimeNs());
+ Assert.assertEquals(2, mockPcapJob.getNumReducers());
+ Assert.assertEquals(100, mockPcapJob.getRecPerFile());
+ Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
+ Map<String, String> actualFixedFields = mockPcapJob.getFixedFields();
+ Assert.assertEquals("ip_src_addr", actualFixedFields.get(Constants.Fields.SRC_ADDR.getName()));
+ Assert.assertEquals("1000", actualFixedFields.get(Constants.Fields.SRC_PORT.getName()));
+ Assert.assertEquals("ip_dst_addr", actualFixedFields.get(Constants.Fields.DST_ADDR.getName()));
+ Assert.assertEquals("2000", actualFixedFields.get(Constants.Fields.DST_PORT.getName()));
+ Assert.assertEquals("true", actualFixedFields.get(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName()));
+ Assert.assertEquals("tcp", actualFixedFields.get(Constants.Fields.PROTOCOL.getName()));
+ Assert.assertEquals("filter", actualFixedFields.get(PcapHelper.PacketFields.PACKET_FILTER.getName()));
}
@Test
public void fixedShouldProperlyCallPcapJobQueryWithDefaults() throws Exception {
+ long beforeJobTime = System.currentTimeMillis();
+
FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
+ MockPcapJob mockPcapJob = new MockPcapJob();
+ mockPcapJobSupplier.setMockPcapJob(mockPcapJob);
+ JobManager jobManager = new InMemoryJobManager<>();
- PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob));
+ PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager));
FileSystem fileSystem = mock(FileSystem.class);
doReturn(fileSystem).when(pcapService).getFileSystem();
- Map<String, String> expectedFields = new HashMap<String, String>() {{
- put(Constants.Fields.INCLUDES_REVERSE_TRAFFIC.getName(), "false");
- }};
- List<byte[]> expectedPcaps = Arrays.asList("pcap1".getBytes(), "pcap2".getBytes());
- SequenceFileIterable results = mock(SequenceFileIterable.class);
- when(results.iterator()).thenReturn(expectedPcaps.iterator());
- when(pcapJob.query(eq(new Path("/input/path")),
- eq(new Path("/output/path")),
- eq(0L),
- eq(fixedPcapRequest.getEndTime() * 1000000),
- eq(1),
- eq(expectedFields),
- eq(configuration),
- any(FileSystem.class),
- any(FixedPcapFilter.Configurator.class))).thenReturn(results);
-
- PcapResponse pcapsResponse = pcapService.fixed(fixedPcapRequest);
- Assert.assertEquals(expectedPcaps, pcapsResponse.getPcaps());
+ mockPcapJob.setStatus(new JobStatus()
+ .withJobId("jobId")
+ .withDescription("description")
+ .withPercentComplete(0L)
+ .withState(JobStatus.State.RUNNING));
+
+ PcapStatus expectedPcapStatus = new PcapStatus();
+ expectedPcapStatus.setJobId("jobId");
+ expectedPcapStatus.setJobStatus(JobStatus.State.RUNNING.name());
+ expectedPcapStatus.setDescription("description");
+
+ Assert.assertEquals(expectedPcapStatus, pcapService.fixed("user", fixedPcapRequest));
+ Assert.assertEquals("/base/path", mockPcapJob.getBasePath());
+ Assert.assertEquals("/base/interim/result/path", mockPcapJob.getBaseInterrimResultPath());
+ Assert.assertEquals("/final/output/path", mockPcapJob.getFinalOutputPath());
+ Assert.assertEquals(0, mockPcapJob.getStartTimeNs());
+ Assert.assertTrue(beforeJobTime <= mockPcapJob.getEndTimeNs() / 1000000);
+ Assert.assertTrue(System.currentTimeMillis() >= mockPcapJob.getEndTimeNs() / 1000000);
+ Assert.assertEquals(10, mockPcapJob.getNumReducers());
+ Assert.assertEquals(100, mockPcapJob.getRecPerFile());
+ Assert.assertTrue(mockPcapJob.getFilterImpl() instanceof FixedPcapFilter.Configurator);
+ Assert.assertEquals(new HashMap<>(), mockPcapJob.getFixedFields());
}
@Test
public void fixedShouldThrowRestException() throws Exception {
exception.expect(RestException.class);
- exception.expectMessage("some exception");
+ exception.expectMessage("some job exception");
FixedPcapRequest fixedPcapRequest = new FixedPcapRequest();
-
- PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJob));
+ JobManager jobManager = mock(JobManager.class);
+ PcapJobSupplier pcapJobSupplier = new PcapJobSupplier();
+ PcapServiceImpl pcapService = spy(new PcapServiceImpl(environment, configuration, pcapJobSupplier, jobManager));
FileSystem fileSystem = mock(FileSystem.class);
doReturn(fileSystem).when(pcapService).getFileSystem();
+ when(jobManager.submit(pcapJobSupplier, "user")).thenThrow(new JobException("some job exception"));
+
+ pcapService.fixed("user", fixedPcapRequest);
+ }
+
+ @Test
+ public void getStatusShouldProperlyReturnStatus() throws Exception {
+ MockPcapJob mockPcapJob = mock(MockPcapJob.class);
+ JobManager jobManager = mock(JobManager.class);
+ JobStatus actualJobStatus = new JobStatus()
+ .withJobId("jobId")
+ .withState(JobStatus.State.SUCCEEDED)
+ .withDescription("description")
+ .withPercentComplete(100.0);
+ Pageable pageable = mock(Pageable.class);
+ when(pageable.getSize()).thenReturn(2);
+ when(mockPcapJob.getStatus()).thenReturn(actualJobStatus);
+ when(mockPcapJob.isDone()).thenReturn(true);
+ when(mockPcapJob.get()).thenReturn(pageable);
+ when(jobManager.getJob("user", "jobId")).thenReturn(mockPcapJob);
+
+ PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, mockPcapJobSupplier, jobManager);
+ PcapStatus expectedPcapStatus = new PcapStatus();
+ expectedPcapStatus.setJobId("jobId");
+ expectedPcapStatus.setJobStatus(JobStatus.State.SUCCEEDED.name());
+ expectedPcapStatus.setDescription("description");
+ expectedPcapStatus.setPercentComplete(100.0);
+ expectedPcapStatus.setPageTotal(2);
+
+ Assert.assertEquals(expectedPcapStatus, pcapService.getJobStatus("user", "jobId"));
+ }
+
+ @Test
+ public void getStatusShouldReturnNullOnMissingStatus() throws Exception {
+ JobManager jobManager = new InMemoryJobManager();
+ PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, new PcapJobSupplier(), jobManager);
- when(pcapJob.query(any(),
- any(),
- eq(0L),
- eq(fixedPcapRequest.getEndTime() * 1000000),
- eq(1),
- any(),
- any(),
- any(FileSystem.class),
- any(FixedPcapFilter.Configurator.class))).thenThrow(new IOException("some exception"));
-
- pcapService.fixed(fixedPcapRequest);
+ Assert.assertNull(pcapService.getJobStatus("user", "jobId"));
}
- */
+
+ @Test
+ public void getStatusShouldThrowRestException() throws Exception {
+ exception.expect(RestException.class);
+ exception.expectMessage("some job exception");
+
+ JobManager jobManager = mock(JobManager.class);
+ when(jobManager.getJob("user", "jobId")).thenThrow(new JobException("some job exception"));
+
+ PcapServiceImpl pcapService = new PcapServiceImpl(environment, configuration, new PcapJobSupplier(), jobManager);
+ pcapService.getJobStatus("user", "jobId");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
index 473664c..8e4211b 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
@@ -18,6 +18,8 @@
package org.apache.metron.common.configuration;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
import java.util.Map;
import java.util.function.BiFunction;
@@ -32,11 +34,17 @@ public interface ConfigOption {
}
default <T> T get(Map<String, Object> map, Class<T> clazz) {
- return clazz.cast(map.get(getKey()));
+ Object obj = map.get(getKey());
+ if(clazz.isInstance(obj)) {
+ return clazz.cast(obj);
+ }
+ else {
+ return ConversionUtils.convert(obj, clazz);
+ }
}
default <T> T get(Map<String, Object> map, BiFunction<String, Object, T> transform, Class<T> clazz) {
- return clazz.cast(map.get(getKey()));
+ return clazz.cast(transform.apply(getKey(), map.get(getKey())));
}
default <T> T getTransformed(Map<String, Object> map, Class<T> clazz) {
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java
new file mode 100644
index 0000000..6a677bf
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobNotFoundException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public class JobNotFoundException extends JobException {
+
+ public JobNotFoundException(String message) {
+ super(message);
+ }
+
+ public JobNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java
new file mode 100644
index 0000000..9013ef8
--- /dev/null
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/RuntimeJobException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.job;
+
+public class RuntimeJobException extends RuntimeException {
+
+ public RuntimeJobException(String message) {
+ super(message);
+ }
+
+ public RuntimeJobException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
index bf0baa7..1340aa5 100644
--- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
+++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/manager/InMemoryJobManager.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.metron.job.JobException;
+import org.apache.metron.job.JobNotFoundException;
import org.apache.metron.job.JobStatus;
import org.apache.metron.job.Statusable;
import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
@Override
public JobStatus getStatus(String username, String jobId) throws JobException {
- return jobs.get(username).get(jobId).getStatus();
+ return getJob(username, jobId).getStatus();
}
@Override
@@ -67,7 +68,11 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
@Override
public Statusable<PAGE_T> getJob(String username, String jobId) throws JobException {
- return getUserJobs(username).get(jobId);
+ Map<String, Statusable<PAGE_T>> jobStatusables = getUserJobs(username);
+ if (jobStatusables.size() > 0 && jobStatusables.containsKey(jobId)) {
+ return jobStatusables.get(jobId);
+ }
+ throw new JobNotFoundException("Could not find job " + jobId + " for user " + username);
}
private Map<String, Statusable<PAGE_T>> getUserJobs(String username) {
@@ -76,7 +81,7 @@ public class InMemoryJobManager<PAGE_T> implements JobManager<PAGE_T> {
@Override
public List<Statusable<PAGE_T>> getJobs(String username) throws JobException {
- return new ArrayList<Statusable<PAGE_T>>(getUserJobs(username).values());
+ return new ArrayList<>(getUserJobs(username).values());
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
index 3462921..1a23740 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/pcap/query/PcapCli.java
@@ -98,12 +98,6 @@ public class PcapCli {
fixedParser.printHelp();
return 0;
}
- Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
- long startTime = time.getLeft();
- long endTime = time.getRight();
-
- PcapOptions.START_TIME_NS.put(commonConfig, startTime);
- PcapOptions.END_TIME_NS.put(commonConfig, endTime);
PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
@@ -128,12 +122,6 @@ public class PcapCli {
queryParser.printHelp();
return 0;
}
- Pair<Long, Long> time = timeAsNanosecondsSinceEpoch(config.getStartTimeMs(), config.getEndTimeMs());
- long startTime = time.getLeft();
- long endTime = time.getRight();
-
- PcapOptions.START_TIME_NS.put(commonConfig, startTime);
- PcapOptions.END_TIME_NS.put(commonConfig, endTime);
PcapOptions.FILTER_IMPL.put(commonConfig, new FixedPcapFilter.Configurator());
PcapOptions.HADOOP_CONF.put(commonConfig, hadoopConf);
try {
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 9ea7912..0be33d6 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -615,8 +615,10 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest {
private void waitForJob(Statusable statusable) throws Exception {
for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) {
- if (statusable.isDone()) {
- return;
+ if (!statusable.getStatus().getState().equals(JobStatus.State.RUNNING)) {
+ if (statusable.isDone()) {
+ return;
+ }
}
}
throw new Exception("Job did not complete within " + (MAX_RETRIES * SLEEP_MS) + " seconds");
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
index 763f0c6..c7d6fdf 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/query/PcapCliTest.java
@@ -208,8 +208,6 @@ public class PcapCliTest {
PcapOptions.NUM_REDUCERS.put(config, 10);
PcapOptions.START_TIME_MS.put(config, startAsNanos / 1000000L); // needed bc defaults in config
PcapOptions.END_TIME_MS.put(config, endAsNanos / 1000000L); // needed bc defaults in config
- PcapOptions.START_TIME_NS.put(config, startAsNanos);
- PcapOptions.END_TIME_NS.put(config, endAsNanos);
PcapOptions.NUM_RECORDS_PER_FILE.put(config, 1000);
when(jobRunner.submit(isA(Finalizer.class), argThat(mapContaining(config)))).thenReturn(jobRunner);
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
index 09effd4..3d7c4f6 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/config/PcapOptions.java
@@ -24,6 +24,8 @@ import org.apache.metron.common.configuration.ConfigOption;
public enum PcapOptions implements ConfigOption {
JOB_NAME("jobName"),
+ JOB_ID("jobId"),
+ USERNAME("username"),
FINAL_FILENAME_PREFIX("finalFilenamePrefix"),
BASE_PATH("basePath", (s, o) -> o == null ? null : new Path(o.toString())),
INTERIM_RESULT_PATH("interimResultPath", (s, o) -> o == null ? null : new Path(o.toString())),
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
index e032158..c379515 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapCliFinalizer.java
@@ -36,10 +36,10 @@ public class PcapCliFinalizer extends PcapFinalizer {
private static final String PCAP_CLI_FILENAME_FORMAT = "%s/pcap-data-%s+%04d.pcap";
@Override
- protected String getOutputFileName(Map<String, Object> config, int partition) {
+ protected Path getOutputPath(Map<String, Object> config, int partition) {
Path finalOutputPath = PcapOptions.FINAL_OUTPUT_PATH.get(config, PcapOptions.STRING_TO_PATH, Path.class);
String prefix = PcapOptions.FINAL_FILENAME_PREFIX.get(config, String.class);
- return String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition);
+ return new Path(String.format(PCAP_CLI_FILENAME_FORMAT, finalOutputPath, prefix, partition));
}
}
http://git-wip-us.apache.org/repos/asf/metron/blob/39ae9f46/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
index d5ac675..2c55e15 100644
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
+++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
@@ -79,10 +79,10 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
int part = 1;
if (partitions.iterator().hasNext()) {
for (List<byte[]> data : partitions) {
- String outFileName = getOutputFileName(config, part++);
+ Path outputPath = getOutputPath(config, part++);
if (data.size() > 0) {
- getResultsWriter().write(hadoopConfig, data, outFileName);
- outFiles.add(new Path(outFileName));
+ getResultsWriter().write(hadoopConfig, data, outputPath.toUri().getPath());
+ outFiles.add(outputPath);
}
}
} else {
@@ -100,7 +100,7 @@ public abstract class PcapFinalizer implements Finalizer<Path> {
return new PcapPages(outFiles);
}
- protected abstract String getOutputFileName(Map<String, Object> config, int partition);
+ protected abstract Path getOutputPath(Map<String, Object> config, int partition);
/**
* Returns a lazily-read Iterable over a set of sequence files.