You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/03/02 20:51:53 UTC
[07/10] incubator-metron git commit: METRON-503: Metron REST API this
closes apache/incubator-metron#316
METRON-503: Metron REST API this closes apache/incubator-metron#316
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/a61dbcf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/a61dbcf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/a61dbcf7
Branch: refs/heads/master
Commit: a61dbcf7ed69768b2342b292460815e5eee0db07
Parents: 55b3e7e
Author: JJ <jj...@gmail.com>
Authored: Sun Jan 29 19:31:37 2017 -0600
Committer: cstella <ce...@gmail.com>
Committed: Thu Mar 2 10:56:56 2017 -0500
----------------------------------------------------------------------
.../metron/rest/model/GrokValidation.java | 20 ++
.../apache/metron/rest/model/KafkaTopic.java | 22 ++
.../apache/metron/rest/config/KafkaConfig.java | 8 +-
.../apache/metron/rest/config/StormConfig.java | 2 +-
.../metron/rest/controller/GrokController.java | 2 +-
.../metron/rest/service/KafkaService.java | 2 +-
.../service/impl/DockerStormCLIWrapper.java | 30 +-
.../service/impl/GlobalConfigServiceImpl.java | 5 +-
.../rest/service/impl/GrokServiceImpl.java | 14 +-
.../rest/service/impl/KafkaServiceImpl.java | 52 ++--
.../apache/metron/rest/config/TestConfig.java | 8 +-
.../GrokControllerIntegrationTest.java | 2 +-
.../metron/rest/service/HdfsServiceTest.java | 117 --------
.../service/impl/DockerStormCLIWrapperTest.java | 83 ++++++
.../impl/GlobalConfigServiceImplTest.java | 154 ++++++++++
.../rest/service/impl/GrokServiceImplTest.java | 151 ++++++++++
.../rest/service/impl/HdfsServiceImplTest.java | 117 ++++++++
.../rest/service/impl/KafkaServiceImplTest.java | 292 +++++++++++++++++++
18 files changed, 913 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
index 7fbcd34..ccd2c5c 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
@@ -52,4 +52,24 @@ public class GrokValidation {
public void setResults(Map<String, Object> results) {
this.results = results;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ GrokValidation that = (GrokValidation) o;
+
+ if (statement != null ? !statement.equals(that.statement) : that.statement != null) return false;
+ if (sampleData != null ? !sampleData.equals(that.sampleData) : that.sampleData != null) return false;
+ return results != null ? results.equals(that.results) : that.results == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = statement != null ? statement.hashCode() : 0;
+ result = 31 * result + (sampleData != null ? sampleData.hashCode() : 0);
+ result = 31 * result + (results != null ? results.hashCode() : 0);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
index c8db9f6..55dd2b2 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
@@ -57,4 +57,26 @@ public class KafkaTopic {
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ KafkaTopic that = (KafkaTopic) o;
+
+ if (numPartitions != that.numPartitions) return false;
+ if (replicationFactor != that.replicationFactor) return false;
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ return properties != null ? properties.equals(that.properties) : that.properties == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + numPartitions;
+ result = 31 * result + replicationFactor;
+ result = 31 * result + (properties != null ? properties.hashCode() : 0);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index 8044405..f6ff73c 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.metron.rest.config;
+import kafka.admin.AdminUtils$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,7 +47,7 @@ public class KafkaConfig {
return ZkUtils.apply(zkClient, false);
}
- @Bean(destroyMethod="close")
+ @Bean(destroyMethod = "close")
public KafkaConsumer<String, String> kafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
@@ -58,4 +59,9 @@ public class KafkaConfig {
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(props);
}
+
+ @Bean
+ public AdminUtils$ adminUtils() {
+ return AdminUtils$.MODULE$;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
index d6d0cff..7a61cbc 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
@@ -40,7 +40,7 @@ public class StormConfig {
@Bean
public StormCLIWrapper stormCLIClientWrapper() {
if (Arrays.asList(environment.getActiveProfiles()).contains(DOCKER_PROFILE)) {
- return new DockerStormCLIWrapper();
+ return new DockerStormCLIWrapper(environment);
} else {
return new StormCLIWrapper();
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
index 2b155b1..d561897 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
@@ -43,7 +43,7 @@ public class GrokController {
@ApiOperation(value = "Applies a Grok statement to a sample message")
@ApiResponse(message = "JSON results", code = 200)
@RequestMapping(value = "/validate", method = RequestMethod.POST)
- ResponseEntity<GrokValidation> post(@ApiParam(name="grokValidation", value="Object containing Grok statment and sample message", required=true)@RequestBody GrokValidation grokValidation) throws RestException {
+ ResponseEntity<GrokValidation> post(@ApiParam(name = "grokValidation", value = "Object containing Grok statement and sample message", required = true) @RequestBody GrokValidation grokValidation) throws RestException {
return new ResponseEntity<>(grokService.validateGrokStatement(grokValidation), HttpStatus.OK);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
index e89b165..f3cd901 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
@@ -23,6 +23,7 @@ import org.apache.metron.rest.model.KafkaTopic;
import java.util.Set;
public interface KafkaService {
+ String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
KafkaTopic createTopic(KafkaTopic topic) throws RestException;
@@ -33,5 +34,4 @@ public interface KafkaService {
Set<String> listTopics();
String getSampleMessage(String topic);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
index d7bbb23..059afd7 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
@@ -30,30 +30,34 @@ import java.util.Map;
public class DockerStormCLIWrapper extends StormCLIWrapper {
- private Logger LOG = LoggerFactory.getLogger(DockerStormCLIWrapper.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DockerStormCLIWrapper.class);
- @Autowired
private Environment environment;
+ @Autowired
+ public DockerStormCLIWrapper(final Environment environment) {
+ this.environment = environment;
+ }
+
@Override
- protected ProcessBuilder getProcessBuilder(String... command) {
- String[] dockerCommand = {"docker-compose", "-f", environment.getProperty("docker.compose.path"), "-p", "metron", "exec", "storm"};
- ProcessBuilder pb = new ProcessBuilder(ArrayUtils.addAll(dockerCommand, command));
- Map<String, String> pbEnvironment = pb.environment();
+ protected ProcessBuilder getProcessBuilder(final String... command) {
+ final String[] dockerCommand = {"docker-compose", "-f", environment.getProperty("docker.compose.path"), "-p", "metron", "exec", "storm"};
+ final ProcessBuilder pb = new ProcessBuilder(ArrayUtils.addAll(dockerCommand, command));
+ final Map<String, String> pbEnvironment = pb.environment();
pbEnvironment.put("METRON_VERSION", environment.getProperty("metron.version"));
setDockerEnvironment(pbEnvironment);
return pb;
}
- protected void setDockerEnvironment(Map<String, String> environmentVariables) {
- ProcessBuilder pb = getDockerEnvironmentProcessBuilder();
+ private void setDockerEnvironment(final Map<String, String> environmentVariables) {
+ final ProcessBuilder pb = getDockerEnvironmentProcessBuilder();
try {
- Process process = pb.start();
- BufferedReader inputStream = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ final Process process = pb.start();
+ final BufferedReader inputStream = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;
- while((line = inputStream.readLine()) != null) {
+ while ((line = inputStream.readLine()) != null) {
if (line.startsWith("export")) {
- String[] parts = line.replaceFirst("export ", "").split("=");
+ final String[] parts = line.replaceFirst("export ", "").split("=");
environmentVariables.put(parts[0], parts[1].replaceAll("\"", ""));
}
}
@@ -63,7 +67,7 @@ public class DockerStormCLIWrapper extends StormCLIWrapper {
}
}
- protected ProcessBuilder getDockerEnvironmentProcessBuilder() {
+ private ProcessBuilder getDockerEnvironmentProcessBuilder() {
String[] command = {"docker-machine", "env", "metron-machine"};
return new ProcessBuilder(command);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
index 54c331a..e80380b 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
@@ -33,9 +33,12 @@ import java.util.Map;
@Service
public class GlobalConfigServiceImpl implements GlobalConfigService {
+ private CuratorFramework client;
@Autowired
- private CuratorFramework client;
+ public GlobalConfigServiceImpl(CuratorFramework client) {
+ this.client = client;
+ }
@Override
public Map<String, Object> save(Map<String, Object> globalConfig) throws RestException {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
index 8fbea13..323ca78 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.metron.rest.service.impl;
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.Match;
+import org.apache.directory.api.util.Strings;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.GrokValidation;
import org.apache.metron.rest.service.GrokService;
@@ -31,9 +32,12 @@ import java.util.Map;
@Service
public class GrokServiceImpl implements GrokService {
+ private Grok commonGrok;
@Autowired
- private Grok commonGrok;
+ public GrokServiceImpl(Grok commonGrok) {
+ this.commonGrok = commonGrok;
+ }
@Override
public Map<String, String> getCommonGrokPatterns() {
@@ -44,10 +48,12 @@ public class GrokServiceImpl implements GrokService {
public GrokValidation validateGrokStatement(GrokValidation grokValidation) throws RestException {
Map<String, Object> results;
try {
+ String statement = Strings.isEmpty(grokValidation.getStatement()) ? "" : grokValidation.getStatement();
+
Grok grok = new Grok();
grok.addPatternFromReader(new InputStreamReader(getClass().getResourceAsStream("/patterns/common")));
- grok.addPatternFromReader(new StringReader(grokValidation.getStatement()));
- String patternLabel = grokValidation.getStatement().substring(0, grokValidation.getStatement().indexOf(" "));
+ grok.addPatternFromReader(new StringReader(statement));
+ String patternLabel = statement.substring(0, statement.indexOf(" "));
String grokPattern = "%{" + patternLabel + "}";
grok.compile(grokPattern);
Match gm = grok.match(grokValidation.getSampleData());
@@ -55,7 +61,7 @@ public class GrokServiceImpl implements GrokService {
results = gm.toMap();
results.remove(patternLabel);
} catch (StringIndexOutOfBoundsException e) {
- throw new RestException("A pattern label must be included (ex. PATTERN_LABEL ${PATTERN:field} ...)", e.getCause());
+ throw new RestException("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)", e.getCause());
} catch (Exception e) {
throw new RestException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 7246c2f..7b10dc4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -18,10 +18,9 @@
package org.apache.metron.rest.service.impl;
import kafka.admin.AdminOperationException;
-import kafka.admin.AdminUtils;
+import kafka.admin.AdminUtils$;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
@@ -32,27 +31,30 @@ import org.apache.metron.rest.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@Service
public class KafkaServiceImpl implements KafkaService {
-
- @Autowired
private ZkUtils zkUtils;
-
- @Autowired
private KafkaConsumer<String, String> kafkaConsumer;
+ private AdminUtils$ adminUtils;
- private String offsetTopic = "__consumer_offsets";
+ @Autowired
+ public KafkaServiceImpl(ZkUtils zkUtils, KafkaConsumer<String, String> kafkaConsumer, AdminUtils$ adminUtils) {
+ this.zkUtils = zkUtils;
+ this.kafkaConsumer = kafkaConsumer;
+ this.adminUtils = adminUtils;
+ }
@Override
public KafkaTopic createTopic(KafkaTopic topic) throws RestException {
if (!listTopics().contains(topic.getName())) {
try {
- AdminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
+ adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
} catch (AdminOperationException e) {
throw new RestException(e);
}
@@ -62,8 +64,9 @@ public class KafkaServiceImpl implements KafkaService {
@Override
public boolean deleteTopic(String name) {
- if (listTopics().contains(name)) {
- AdminUtils.deleteTopic(zkUtils, name);
+ Set<String> topics = listTopics();
+ if (topics != null && topics.contains(name)) {
+ adminUtils.deleteTopic(zkUtils, name);
return true;
} else {
return false;
@@ -90,8 +93,9 @@ public class KafkaServiceImpl implements KafkaService {
public Set<String> listTopics() {
Set<String> topics;
synchronized (this) {
- topics = kafkaConsumer.listTopics().keySet();
- topics.remove(offsetTopic);
+ Map<String, List<PartitionInfo>> topicsInfo = kafkaConsumer.listTopics();
+ topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet();
+ topics.remove(CONSUMER_OFFSETS_TOPIC);
}
return topics;
}
@@ -101,20 +105,16 @@ public class KafkaServiceImpl implements KafkaService {
String message = null;
if (listTopics().contains(topic)) {
synchronized (this) {
- kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream().map(partitionInfo ->
- new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
- for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
- long offset = kafkaConsumer.position(topicPartition) - 1;
- if (offset >= 0) {
- kafkaConsumer.seek(topicPartition, offset);
- }
- }
+ kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
+ .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+ .collect(Collectors.toList()));
+
+ kafkaConsumer.assignment().stream()
+ .filter(p -> (kafkaConsumer.position(p) -1) >= 0)
+ .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
+
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
- Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
- if (iterator.hasNext()) {
- ConsumerRecord<String, String> record = iterator.next();
- message = record.value();
- }
+ message = records.isEmpty() ? null : records.iterator().next().value();
kafkaConsumer.unsubscribe();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index 7931fe6..edfd542 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -17,7 +17,7 @@
*/
package org.apache.metron.rest.config;
-import com.google.common.base.Function;
+import kafka.admin.AdminUtils$;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
@@ -38,7 +38,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.web.client.RestTemplate;
-import javax.annotation.Nullable;
import java.util.Properties;
import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
@@ -121,4 +120,9 @@ public class TestConfig {
restTemplate.setMockStormCLIClientWrapper((MockStormCLIClientWrapper) stormCLIClientWrapper);
return restTemplate;
}
+
+ @Bean
+ public AdminUtils$ adminUtils() {
+ return AdminUtils$.MODULE$;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
index e618d48..4532616 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
@@ -105,7 +105,7 @@ public class GrokControllerIntegrationTest {
.andExpect(status().isInternalServerError())
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
.andExpect(jsonPath("$.responseCode").value(500))
- .andExpect(jsonPath("$.message").value("A pattern label must be included (ex. PATTERN_LABEL ${PATTERN:field} ...)"));
+ .andExpect(jsonPath("$.message").value("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)"));
this.mockMvc.perform(get(grokUrl + "/list").with(httpBasic(user,password)))
.andExpect(status().isOk())
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java
deleted file mode 100644
index f7e43ab..0000000
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.rest.service;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.rest.config.HadoopConfig;
-import org.apache.metron.rest.service.impl.HdfsServiceImpl;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Profile;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={HadoopConfig.class, HdfsServiceTest.HdfsServiceTestContextConfiguration.class})
-@ActiveProfiles(TEST_PROFILE)
-public class HdfsServiceTest {
-
- @Configuration
- @Profile("test")
- static class HdfsServiceTestContextConfiguration {
-
- @Bean
- public HdfsService hdfsService() {
- return new HdfsServiceImpl();
- }
- }
-
- @Autowired
- private HdfsService hdfsService;
-
- @Test
- public void test() throws IOException {
- String rootDir = "./src/test/tmp";
- File rootFile = new File(rootDir);
- Path rootPath = new Path(rootDir);
- if (rootFile.exists()) {
- FileUtils.cleanDirectory(rootFile);
- FileUtils.deleteDirectory(rootFile);
- }
- assertEquals(true, rootFile.mkdir());
- String fileName1 = "fileName1";
- String fileName2 = "fileName2";
- Path path1 = new Path(rootDir, fileName1);
- String value1 = "value1";
- String value2 = "value2";
- Path path2 = new Path(rootDir, fileName2);
- String invalidFile = "invalidFile";
- Path pathInvalidFile = new Path(rootDir, invalidFile);
-
- FileStatus[] fileStatuses = hdfsService.list(new Path(rootDir));
- assertEquals(0, fileStatuses.length);
-
-
- hdfsService.write(path1, value1.getBytes());
- assertEquals(value1, FileUtils.readFileToString(new File(rootDir, fileName1)));
- assertEquals(value1, new String(hdfsService.read(path1)));
-
- fileStatuses = hdfsService.list(rootPath);
- assertEquals(1, fileStatuses.length);
- assertEquals(fileName1, fileStatuses[0].getPath().getName());
-
- hdfsService.write(path2, value2.getBytes());
- assertEquals(value2, FileUtils.readFileToString(new File(rootDir, fileName2)));
- assertEquals(value2, new String(hdfsService.read(path2)));
-
- fileStatuses = hdfsService.list(rootPath);
- assertEquals(2, fileStatuses.length);
- assertEquals(fileName1, fileStatuses[0].getPath().getName());
- assertEquals(fileName1, fileStatuses[0].getPath().getName());
-
- assertEquals(true, hdfsService.delete(path1, false));
- fileStatuses = hdfsService.list(rootPath);
- assertEquals(1, fileStatuses.length);
- assertEquals(fileName2, fileStatuses[0].getPath().getName());
- assertEquals(true, hdfsService.delete(path2, false));
- fileStatuses = hdfsService.list(rootPath);
- assertEquals(0, fileStatuses.length);
-
- try {
- hdfsService.read(pathInvalidFile);
- fail("Exception should be thrown when reading invalid file name");
- } catch(IOException e) {
- }
- assertEquals(false, hdfsService.delete(pathInvalidFile, false));
-
- FileUtils.deleteDirectory(new File(rootDir));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java
new file mode 100644
index 0000000..1217bcb
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.core.env.Environment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyVararg;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@SuppressWarnings("unchecked")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DockerStormCLIWrapper.class, ProcessBuilder.class})
+public class DockerStormCLIWrapperTest {
+ private ProcessBuilder processBuilder;
+ private Environment environment;
+ private DockerStormCLIWrapper dockerStormCLIWrapper;
+
+ @Before
+ public void setUp() throws Exception {
+ processBuilder = mock(ProcessBuilder.class);
+ environment = mock(Environment.class);
+
+ dockerStormCLIWrapper = new DockerStormCLIWrapper(environment);
+ }
+
+ @Test
+ public void getProcessBuilderShouldProperlyGenerateProcessorBuilder() throws Exception {
+ whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder);
+
+ when(processBuilder.environment()).thenReturn(new HashMap<>());
+ when(processBuilder.command()).thenReturn(new ArrayList<>());
+
+ Process process = mock(Process.class);
+ InputStream inputStream = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return -1;
+ }
+ };
+
+ when(processBuilder.start()).thenReturn(process);
+ when(process.getInputStream()).thenReturn(inputStream);
+ when(environment.getProperty("docker.compose.path")).thenReturn("/test");
+ when(environment.getProperty("metron.version")).thenReturn("1");
+
+
+ ProcessBuilder actualBuilder = dockerStormCLIWrapper.getProcessBuilder("oo", "ooo");
+
+ assertEquals(new HashMap<String, String>() {{ put("METRON_VERSION", "1"); }}, actualBuilder.environment());
+ assertEquals(new ArrayList<>(), actualBuilder.command());
+
+ verify(process).waitFor();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
new file mode 100644
index 0000000..59d5957
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ALL")
+public class GlobalConfigServiceImplTest {
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ CuratorFramework curatorFramework;
+ GlobalConfigService globalConfigService;
+
+ @Before
+ public void setUp() throws Exception {
+ curatorFramework = mock(CuratorFramework.class);
+ globalConfigService = new GlobalConfigServiceImpl(curatorFramework);
+ }
+
+
+ @Test
+ public void deleteShouldProperlyCatchNoNodeExceptionAndReturnFalse() throws Exception {
+ DeleteBuilder builder = mock(DeleteBuilder.class);
+
+ when(curatorFramework.delete()).thenReturn(builder);
+ when(builder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
+
+ assertFalse(globalConfigService.delete());
+ }
+
+ @Test
+ public void deleteShouldProperlyCatchNonNoNodeExceptionAndThrowRestException() throws Exception {
+ exception.expect(RestException.class);
+
+ DeleteBuilder builder = mock(DeleteBuilder.class);
+
+ when(curatorFramework.delete()).thenReturn(builder);
+ when(builder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(Exception.class);
+
+ assertFalse(globalConfigService.delete());
+ }
+
+ @Test
+ public void deleteShouldReturnTrueWhenClientSuccessfullyCallsDelete() throws Exception {
+ DeleteBuilder builder = mock(DeleteBuilder.class);
+
+ when(curatorFramework.delete()).thenReturn(builder);
+ when(builder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(null);
+
+ assertTrue(globalConfigService.delete());
+
+ verify(curatorFramework).delete();
+ }
+
+ @Test
+ public void getShouldProperlyReturnGlobalConfig() throws Exception {
+ final String config = "{\"k\":\"v\"}";
+ final Map<String, Object> configMap = new HashMap<String, Object>() {{
+ put("k", "v");
+ }};
+
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(config.getBytes());
+
+ when(curatorFramework.getData()).thenReturn(getDataBuilder);
+
+ assertEquals(configMap, globalConfigService.get());
+ }
+
+ @Test
+ public void getShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
+
+ when(curatorFramework.getData()).thenReturn(getDataBuilder);
+
+ assertNull(globalConfigService.get());
+ }
+
+ @Test
+ public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception {
+ exception.expect(RestException.class);
+
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(Exception.class);
+
+ when(curatorFramework.getData()).thenReturn(getDataBuilder);
+
+ globalConfigService.get();
+ }
+
+ @Test
+ public void saveShouldWrapExceptionInRestException() throws Exception {
+ exception.expect(RestException.class);
+
+ SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
+ when(setDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot(), "{}".getBytes())).thenThrow(Exception.class);
+
+ when(curatorFramework.setData()).thenReturn(setDataBuilder);
+
+ globalConfigService.save(new HashMap<>());
+ }
+
+ @Test
+ public void saveShouldReturnSameConfigThatIsPassedOnSuccessfulSave() throws Exception {
+ SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
+ when(setDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot(), "{}".getBytes())).thenReturn(new Stat());
+
+ when(curatorFramework.setData()).thenReturn(setDataBuilder);
+
+ assertEquals(new HashMap<>(), globalConfigService.save(new HashMap<>()));
+ verify(setDataBuilder).forPath(eq(ConfigurationType.GLOBAL.getZookeeperRoot()), eq("{}".getBytes()));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java
new file mode 100644
index 0000000..7fc8748
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import oi.thekraken.grok.api.Grok;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.GrokValidation;
+import org.apache.metron.rest.service.GrokService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GrokServiceImplTest {
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private Grok grok;
+ private GrokService grokService;
+
+ @Before
+ public void setUp() throws Exception {
+ grok = mock(Grok.class);
+ grokService = new GrokServiceImpl(grok);
+ }
+
+ @Test
+ public void getCommonGrokPattersShouldCallGrokToGetPatterns() throws Exception {
+ grokService.getCommonGrokPatterns();
+
+ verify(grok).getPatterns();
+ }
+
+ @Test
+ public void getCommonGrokPattersShouldCallGrokToGetPatternsAndNotAlterValue() throws Exception {
+ Map<String, String> patterns = new HashMap<String, String>() {{
+ put("k", "v");
+ put("k1", "v1");
+ }};
+
+ when(grok.getPatterns()).thenReturn(patterns);
+
+ assertEquals(patterns, grokService.getCommonGrokPatterns());
+ }
+
+ @Test
+ public void validateGrokStatementShouldThrowExceptionWithEmptyStringAsStatement() throws Exception {
+ exception.expect(RestException.class);
+ exception.expectMessage("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)");
+
+ GrokValidation grokValidation = new GrokValidation();
+ grokValidation.setResults(new HashMap<>());
+ grokValidation.setSampleData("asdf asdf");
+ grokValidation.setStatement("");
+
+ grokService.validateGrokStatement(grokValidation);
+ }
+
+ @Test
+ public void validateGrokStatementShouldThrowExceptionWithNullStringAsStatement() throws Exception {
+ exception.expect(RestException.class);
+ exception.expectMessage("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)");
+
+ GrokValidation grokValidation = new GrokValidation();
+ grokValidation.setResults(new HashMap<>());
+ grokValidation.setSampleData("asdf asdf");
+ grokValidation.setStatement(null);
+
+ grokService.validateGrokStatement(grokValidation);
+ }
+
+ @Test
+ public void validateGrokStatementShouldProperlyMatchSampleDataAgainstGivenStatement() throws Exception {
+ GrokValidation grokValidation = new GrokValidation();
+ grokValidation.setResults(new HashMap<>());
+ grokValidation.setSampleData("asdf asdf");
+ grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+ GrokValidation expectedGrokValidation = new GrokValidation();
+ expectedGrokValidation.setResults(new HashMap<String, Object>() {{ put("word1", "asdf"); put("word2", "asdf"); }});
+ expectedGrokValidation.setSampleData("asdf asdf");
+ expectedGrokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+ assertEquals(expectedGrokValidation, grokService.validateGrokStatement(grokValidation));
+ }
+
+ @Test
+ public void validateGrokStatementShouldProperlyMatchNothingAgainstEmptyString() throws Exception {
+ GrokValidation grokValidation = new GrokValidation();
+ grokValidation.setResults(new HashMap<>());
+ grokValidation.setSampleData("");
+ grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+ GrokValidation expectedGrokValidation = new GrokValidation();
+ expectedGrokValidation.setResults(new HashMap<>());
+ expectedGrokValidation.setSampleData("");
+ expectedGrokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+ assertEquals(expectedGrokValidation, grokService.validateGrokStatement(grokValidation));
+ }
+
+ @Test
+ public void validateGrokStatementShouldProperlyMatchNothingAgainstNullString() throws Exception {
+ GrokValidation grokValidation = new GrokValidation();
+ grokValidation.setResults(new HashMap<>());
+ grokValidation.setSampleData(null);
+ grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+ GrokValidation expectedGrokValidation = new GrokValidation();
+ expectedGrokValidation.setResults(new HashMap<>());
+ expectedGrokValidation.setSampleData(null);
+ expectedGrokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+ assertEquals(expectedGrokValidation, grokService.validateGrokStatement(grokValidation));
+ }
+
+ @Test
+ public void invalidGrokStatementShouldThrowRestException() throws Exception {
+ exception.expect(RestException.class);
+
+ GrokValidation grokValidation = new GrokValidation();
+ grokValidation.setResults(new HashMap<>());
+ grokValidation.setSampleData(null);
+ grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2");
+
+ grokService.validateGrokStatement(grokValidation);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java
new file mode 100644
index 0000000..d67892e
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.rest.config.HadoopConfig;
+import org.apache.metron.rest.service.HdfsService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={HadoopConfig.class, HdfsServiceImplTest.HdfsServiceTestContextConfiguration.class})
+@ActiveProfiles(TEST_PROFILE)
+public class HdfsServiceImplTest {
+
+ @Configuration
+ @Profile("test")
+ static class HdfsServiceTestContextConfiguration {
+
+ @Bean
+ public HdfsService hdfsService() {
+ return new HdfsServiceImpl();
+ }
+ }
+
+ @Autowired
+ private HdfsService hdfsService;
+
+ @Test
+ public void test() throws IOException {
+ String rootDir = "./src/test/tmp";
+ File rootFile = new File(rootDir);
+ Path rootPath = new Path(rootDir);
+ if (rootFile.exists()) {
+ FileUtils.cleanDirectory(rootFile);
+ FileUtils.deleteDirectory(rootFile);
+ }
+ assertEquals(true, rootFile.mkdir());
+ String fileName1 = "fileName1";
+ String fileName2 = "fileName2";
+ Path path1 = new Path(rootDir, fileName1);
+ String value1 = "value1";
+ String value2 = "value2";
+ Path path2 = new Path(rootDir, fileName2);
+ String invalidFile = "invalidFile";
+ Path pathInvalidFile = new Path(rootDir, invalidFile);
+
+ FileStatus[] fileStatuses = hdfsService.list(new Path(rootDir));
+ assertEquals(0, fileStatuses.length);
+
+
+ hdfsService.write(path1, value1.getBytes());
+ assertEquals(value1, FileUtils.readFileToString(new File(rootDir, fileName1)));
+ assertEquals(value1, new String(hdfsService.read(path1)));
+
+ fileStatuses = hdfsService.list(rootPath);
+ assertEquals(1, fileStatuses.length);
+ assertEquals(fileName1, fileStatuses[0].getPath().getName());
+
+ hdfsService.write(path2, value2.getBytes());
+ assertEquals(value2, FileUtils.readFileToString(new File(rootDir, fileName2)));
+ assertEquals(value2, new String(hdfsService.read(path2)));
+
+ fileStatuses = hdfsService.list(rootPath);
+ assertEquals(2, fileStatuses.length);
+ assertEquals(fileName1, fileStatuses[0].getPath().getName());
+ assertEquals(fileName1, fileStatuses[0].getPath().getName());
+
+ assertEquals(true, hdfsService.delete(path1, false));
+ fileStatuses = hdfsService.list(rootPath);
+ assertEquals(1, fileStatuses.length);
+ assertEquals(fileName2, fileStatuses[0].getPath().getName());
+ assertEquals(true, hdfsService.delete(path2, false));
+ fileStatuses = hdfsService.list(rootPath);
+ assertEquals(0, fileStatuses.length);
+
+ try {
+ hdfsService.read(pathInvalidFile);
+ fail("Exception should be thrown when reading invalid file name");
+ } catch(IOException e) {
+ }
+ assertEquals(false, hdfsService.delete(pathInvalidFile, false));
+
+ FileUtils.deleteDirectory(new File(rootDir));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
new file mode 100644
index 0000000..b211ee6
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import kafka.admin.AdminOperationException;
+import kafka.admin.AdminUtils$;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.KafkaTopic;
+import org.apache.metron.rest.service.KafkaService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+@SuppressWarnings("unchecked")
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*") // resolve classloader conflict
+@PrepareForTest({AdminUtils$.class})
+public class KafkaServiceImplTest {
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private ZkUtils zkUtils;
+ private KafkaConsumer<String, String> kafkaConsumer;
+ private AdminUtils$ adminUtils;
+
+ private KafkaService kafkaService;
+
+ private static final KafkaTopic VALID_KAFKA_TOPIC = new KafkaTopic() {{
+ setReplicationFactor(2);
+ setNumPartitions(1);
+ setName("t");
+ setProperties(new Properties());
+ }};
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setUp() throws Exception {
+ zkUtils = mock(ZkUtils.class);
+ kafkaConsumer = mock(KafkaConsumer.class);
+ adminUtils = mock(AdminUtils$.class);
+
+ kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumer, adminUtils);
+ }
+
+ @Test
+ public void listTopicsHappyPathWithListTopicsReturningNull() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = null;
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ final Set<String> listedTopics = kafkaService.listTopics();
+
+ assertEquals(Sets.newHashSet(), listedTopics);
+
+ verifyZeroInteractions(zkUtils);
+ verify(kafkaConsumer).listTopics();
+ verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils);
+ }
+
+ @Test
+ public void listTopicsHappyPathWithListTopicsReturningEmptyMap() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ final Set<String> listedTopics = kafkaService.listTopics();
+
+ assertEquals(Sets.newHashSet(), listedTopics);
+
+ verifyZeroInteractions(zkUtils);
+ verify(kafkaConsumer).listTopics();
+ verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ }
+
+ @Test
+ public void listTopicsHappyPath() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+ topics.put("topic1", Lists.newArrayList());
+ topics.put("topic2", Lists.newArrayList());
+ topics.put("topic3", Lists.newArrayList());
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ final Set<String> listedTopics = kafkaService.listTopics();
+
+ assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
+
+ verifyZeroInteractions(zkUtils);
+ verify(kafkaConsumer).listTopics();
+ verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ }
+
+ @Test
+ public void listTopicsShouldProperlyRemoveOffsetTopic() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+ topics.put("topic1", Lists.newArrayList());
+ topics.put("topic2", Lists.newArrayList());
+ topics.put("topic3", Lists.newArrayList());
+ topics.put("__consumer_offsets", Lists.newArrayList());
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ final Set<String> listedTopics = kafkaService.listTopics();
+
+ assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
+
+ verifyZeroInteractions(zkUtils);
+ verify(kafkaConsumer).listTopics();
+ verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ }
+
+ @Test
+ public void deletingTopicThatDoesNotExistShouldReturnFalse() throws Exception {
+ when(kafkaConsumer.listTopics()).thenReturn(Maps.newHashMap());
+
+ assertFalse(kafkaService.deleteTopic("non_existent_topic"));
+
+ verifyZeroInteractions(zkUtils);
+ verify(kafkaConsumer).listTopics();
+ verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+ }
+
+ @Test
+ public void deletingTopicThatExistShouldReturnTrue() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+ topics.put("non_existent_topic", Lists.newArrayList());
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ assertTrue(kafkaService.deleteTopic("non_existent_topic"));
+
+ verify(kafkaConsumer).listTopics();
+ verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
+ verifyNoMoreInteractions(kafkaConsumer);
+ }
+
+ @Test
+ public void makeSureDeletingTopicReturnsFalseWhenNoTopicsExist() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = null;
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ assertFalse(kafkaService.deleteTopic("non_existent_topic"));
+
+ verify(kafkaConsumer).listTopics();
+ verifyNoMoreInteractions(kafkaConsumer);
+ }
+
+ @Test
+ public void getTopicShouldProperlyMapTopicToKafkaTopic() throws Exception {
+ final PartitionInfo partitionInfo = mock(PartitionInfo.class);
+ when(partitionInfo.replicas()).thenReturn(new Node[] {new Node(1, "host", 8080)});
+
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+ topics.put("t", Lists.newArrayList(partitionInfo));
+ topics.put("t1", Lists.newArrayList());
+
+ final KafkaTopic expected = new KafkaTopic();
+ expected.setName("t");
+ expected.setNumPartitions(1);
+ expected.setReplicationFactor(1);
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+ when(kafkaConsumer.partitionsFor("t")).thenReturn(Lists.newArrayList(partitionInfo));
+
+ assertEquals(expected, kafkaService.getTopic("t"));
+ }
+
+ @Test
+ public void getTopicShouldProperlyHandleTopicsThatDontExist() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+ topics.put("t1", Lists.newArrayList());
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+ when(kafkaConsumer.partitionsFor("t")).thenReturn(Lists.newArrayList());
+
+ assertEquals(null, kafkaService.getTopic("t"));
+
+ verify(kafkaConsumer).listTopics();
+ verify(kafkaConsumer, times(0)).partitionsFor("t");
+ verifyZeroInteractions(zkUtils);
+ verifyNoMoreInteractions(kafkaConsumer);
+ }
+
+ @Test
+ public void createTopicShouldFailIfReplicationFactorIsGreaterThanAvailableBrokers() throws Exception {
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ kafkaService.createTopic(VALID_KAFKA_TOPIC);
+
+ verify(adminUtils).createTopic(eq(zkUtils), eq("t"), eq(1), eq(2), eq(new Properties()), eq(RackAwareMode.Disabled$.MODULE$));
+ verify(kafkaConsumer).listTopics();
+ verifyZeroInteractions(zkUtils);
+ }
+
+ @Test
+ public void whenAdminUtilsThrowsAdminOperationExceptionCreateTopicShouldProperlyWrapExceptionInRestException() throws Exception {
+ exception.expect(RestException.class);
+
+ final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+ topics.put("1", new ArrayList<>());
+
+ when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+ doThrow(AdminOperationException.class).when(adminUtils).createTopic(eq(zkUtils), eq("t"), eq(1), eq(2), eq(new Properties()), eq(RackAwareMode.Disabled$.MODULE$));
+
+ kafkaService.createTopic(VALID_KAFKA_TOPIC);
+ }
+
+ @Test
+ public void getSampleMessageProperlyReturnsAMessageFromAGivenKafkaTopic() throws Exception {
+ final String topicName = "t";
+ final Node host = new Node(1, "host", 8080);
+ final Node[] replicas = {host};
+ final List<PartitionInfo> partitionInfo = Lists.newArrayList(new PartitionInfo(topicName, 1, host, replicas, replicas));
+ final TopicPartition topicPartition = new TopicPartition(topicName, 1);
+ final List<TopicPartition> topicPartitions = Lists.newArrayList(topicPartition);
+ final Set<TopicPartition> topicPartitionsSet = Sets.newHashSet(topicPartitions);
+ final ConsumerRecords<String, String> records = new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>() {{
+ put(topicPartition, Lists.newArrayList(new ConsumerRecord<>(topicName, 1, 1, "k", "message")));
+ }});
+
+ when(kafkaConsumer.listTopics()).thenReturn(new HashMap<String, List<PartitionInfo>>() {{ put(topicName, Lists.newArrayList()); }});
+ when(kafkaConsumer.partitionsFor(eq(topicName))).thenReturn(partitionInfo);
+ when(kafkaConsumer.assignment()).thenReturn(topicPartitionsSet);
+ when(kafkaConsumer.position(topicPartition)).thenReturn(1L);
+ when(kafkaConsumer.poll(100)).thenReturn(records);
+
+ assertEquals("message", kafkaService.getSampleMessage(topicName));
+
+ verify(kafkaConsumer).assign(eq(topicPartitions));
+ verify(kafkaConsumer).assignment();
+ verify(kafkaConsumer).poll(100);
+ verify(kafkaConsumer).unsubscribe();
+ verify(kafkaConsumer, times(2)).position(topicPartition);
+ verify(kafkaConsumer).seek(topicPartition, 0);
+
+ verifyZeroInteractions(zkUtils, adminUtils);
+ }
+}