You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2017/03/02 20:51:53 UTC

[07/10] incubator-metron git commit: METRON-503: Metron REST API this closes apache/incubator-metron#316

METRON-503: Metron REST API this closes apache/incubator-metron#316


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/a61dbcf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/a61dbcf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/a61dbcf7

Branch: refs/heads/master
Commit: a61dbcf7ed69768b2342b292460815e5eee0db07
Parents: 55b3e7e
Author: JJ <jj...@gmail.com>
Authored: Sun Jan 29 19:31:37 2017 -0600
Committer: cstella <ce...@gmail.com>
Committed: Thu Mar 2 10:56:56 2017 -0500

----------------------------------------------------------------------
 .../metron/rest/model/GrokValidation.java       |  20 ++
 .../apache/metron/rest/model/KafkaTopic.java    |  22 ++
 .../apache/metron/rest/config/KafkaConfig.java  |   8 +-
 .../apache/metron/rest/config/StormConfig.java  |   2 +-
 .../metron/rest/controller/GrokController.java  |   2 +-
 .../metron/rest/service/KafkaService.java       |   2 +-
 .../service/impl/DockerStormCLIWrapper.java     |  30 +-
 .../service/impl/GlobalConfigServiceImpl.java   |   5 +-
 .../rest/service/impl/GrokServiceImpl.java      |  14 +-
 .../rest/service/impl/KafkaServiceImpl.java     |  52 ++--
 .../apache/metron/rest/config/TestConfig.java   |   8 +-
 .../GrokControllerIntegrationTest.java          |   2 +-
 .../metron/rest/service/HdfsServiceTest.java    | 117 --------
 .../service/impl/DockerStormCLIWrapperTest.java |  83 ++++++
 .../impl/GlobalConfigServiceImplTest.java       | 154 ++++++++++
 .../rest/service/impl/GrokServiceImplTest.java  | 151 ++++++++++
 .../rest/service/impl/HdfsServiceImplTest.java  | 117 ++++++++
 .../rest/service/impl/KafkaServiceImplTest.java | 292 +++++++++++++++++++
 18 files changed, 913 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
index 7fbcd34..ccd2c5c 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/GrokValidation.java
@@ -52,4 +52,24 @@ public class GrokValidation {
     public void setResults(Map<String, Object> results) {
         this.results = results;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        GrokValidation that = (GrokValidation) o;
+
+        if (statement != null ? !statement.equals(that.statement) : that.statement != null) return false;
+        if (sampleData != null ? !sampleData.equals(that.sampleData) : that.sampleData != null) return false;
+        return results != null ? results.equals(that.results) : that.results == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = statement != null ? statement.hashCode() : 0;
+        result = 31 * result + (sampleData != null ? sampleData.hashCode() : 0);
+        result = 31 * result + (results != null ? results.hashCode() : 0);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
index c8db9f6..55dd2b2 100644
--- a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
+++ b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/KafkaTopic.java
@@ -57,4 +57,26 @@ public class KafkaTopic {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KafkaTopic that = (KafkaTopic) o;
+
+        if (numPartitions != that.numPartitions) return false;
+        if (replicationFactor != that.replicationFactor) return false;
+        if (name != null ? !name.equals(that.name) : that.name != null) return false;
+        return properties != null ? properties.equals(that.properties) : that.properties == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = name != null ? name.hashCode() : 0;
+        result = 31 * result + numPartitions;
+        result = 31 * result + replicationFactor;
+        result = 31 * result + (properties != null ? properties.hashCode() : 0);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index 8044405..f6ff73c 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.rest.config;
 
+import kafka.admin.AdminUtils$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,7 +47,7 @@ public class KafkaConfig {
     return ZkUtils.apply(zkClient, false);
   }
 
-  @Bean(destroyMethod="close")
+  @Bean(destroyMethod = "close")
   public KafkaConsumer<String, String> kafkaConsumer() {
     Properties props = new Properties();
     props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
@@ -58,4 +59,9 @@ public class KafkaConfig {
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     return new KafkaConsumer<>(props);
   }
+
+  @Bean
+  public AdminUtils$ adminUtils() {
+    return AdminUtils$.MODULE$;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
index d6d0cff..7a61cbc 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
@@ -40,7 +40,7 @@ public class StormConfig {
   @Bean
   public StormCLIWrapper stormCLIClientWrapper() {
     if (Arrays.asList(environment.getActiveProfiles()).contains(DOCKER_PROFILE)) {
-      return new DockerStormCLIWrapper();
+      return new DockerStormCLIWrapper(environment);
     } else {
       return new StormCLIWrapper();
     }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
index 2b155b1..d561897 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/GrokController.java
@@ -43,7 +43,7 @@ public class GrokController {
     @ApiOperation(value = "Applies a Grok statement to a sample message")
     @ApiResponse(message = "JSON results", code = 200)
     @RequestMapping(value = "/validate", method = RequestMethod.POST)
-    ResponseEntity<GrokValidation> post(@ApiParam(name="grokValidation", value="Object containing Grok statment and sample message", required=true)@RequestBody GrokValidation grokValidation) throws RestException {
+    ResponseEntity<GrokValidation> post(@ApiParam(name = "grokValidation", value = "Object containing Grok statement and sample message", required = true) @RequestBody GrokValidation grokValidation) throws RestException {
         return new ResponseEntity<>(grokService.validateGrokStatement(grokValidation), HttpStatus.OK);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
index e89b165..f3cd901 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java
@@ -23,6 +23,7 @@ import org.apache.metron.rest.model.KafkaTopic;
 import java.util.Set;
 
 public interface KafkaService {
+    String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
 
     KafkaTopic createTopic(KafkaTopic topic) throws RestException;
 
@@ -33,5 +34,4 @@ public interface KafkaService {
     Set<String> listTopics();
 
     String getSampleMessage(String topic);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
index d7bbb23..059afd7 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapper.java
@@ -30,30 +30,34 @@ import java.util.Map;
 
 public class DockerStormCLIWrapper extends StormCLIWrapper {
 
-  private Logger LOG = LoggerFactory.getLogger(DockerStormCLIWrapper.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DockerStormCLIWrapper.class);
 
-  @Autowired
   private Environment environment;
 
+  @Autowired
+  public DockerStormCLIWrapper(final Environment environment) {
+    this.environment = environment;
+  }
+
   @Override
-  protected ProcessBuilder getProcessBuilder(String... command) {
-    String[] dockerCommand = {"docker-compose", "-f", environment.getProperty("docker.compose.path"), "-p", "metron", "exec", "storm"};
-    ProcessBuilder pb = new ProcessBuilder(ArrayUtils.addAll(dockerCommand, command));
-    Map<String, String> pbEnvironment = pb.environment();
+  protected ProcessBuilder getProcessBuilder(final String... command) {
+    final String[] dockerCommand = {"docker-compose", "-f", environment.getProperty("docker.compose.path"), "-p", "metron", "exec", "storm"};
+    final ProcessBuilder pb = new ProcessBuilder(ArrayUtils.addAll(dockerCommand, command));
+    final Map<String, String> pbEnvironment = pb.environment();
     pbEnvironment.put("METRON_VERSION", environment.getProperty("metron.version"));
     setDockerEnvironment(pbEnvironment);
     return pb;
   }
 
-  protected void setDockerEnvironment(Map<String, String> environmentVariables) {
-    ProcessBuilder pb = getDockerEnvironmentProcessBuilder();
+  private void setDockerEnvironment(final Map<String, String> environmentVariables) {
+    final ProcessBuilder pb = getDockerEnvironmentProcessBuilder();
     try {
-      Process process = pb.start();
-      BufferedReader inputStream = new BufferedReader(new InputStreamReader(process.getInputStream()));
+      final Process process = pb.start();
+      final BufferedReader inputStream = new BufferedReader(new InputStreamReader(process.getInputStream()));
       String line;
-      while((line = inputStream.readLine()) != null) {
+      while ((line = inputStream.readLine()) != null) {
         if (line.startsWith("export")) {
-          String[] parts = line.replaceFirst("export ", "").split("=");
+          final String[] parts = line.replaceFirst("export ", "").split("=");
           environmentVariables.put(parts[0], parts[1].replaceAll("\"", ""));
         }
       }
@@ -63,7 +67,7 @@ public class DockerStormCLIWrapper extends StormCLIWrapper {
     }
   }
 
-  protected ProcessBuilder getDockerEnvironmentProcessBuilder() {
+  private ProcessBuilder getDockerEnvironmentProcessBuilder() {
     String[] command = {"docker-machine", "env", "metron-machine"};
     return new ProcessBuilder(command);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
index 54c331a..e80380b 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java
@@ -33,9 +33,12 @@ import java.util.Map;
 
 @Service
 public class GlobalConfigServiceImpl implements GlobalConfigService {
+    private CuratorFramework client;
 
     @Autowired
-    private CuratorFramework client;
+    public GlobalConfigServiceImpl(CuratorFramework client) {
+      this.client = client;
+    }
 
     @Override
     public Map<String, Object> save(Map<String, Object> globalConfig) throws RestException {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
index 8fbea13..323ca78 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.metron.rest.service.impl;
 
 import oi.thekraken.grok.api.Grok;
 import oi.thekraken.grok.api.Match;
+import org.apache.directory.api.util.Strings;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.GrokValidation;
 import org.apache.metron.rest.service.GrokService;
@@ -31,9 +32,12 @@ import java.util.Map;
 
 @Service
 public class GrokServiceImpl implements GrokService {
+    private Grok commonGrok;
 
     @Autowired
-    private Grok commonGrok;
+    public GrokServiceImpl(Grok commonGrok) {
+        this.commonGrok = commonGrok;
+    }
 
     @Override
     public Map<String, String> getCommonGrokPatterns() {
@@ -44,10 +48,12 @@ public class GrokServiceImpl implements GrokService {
     public GrokValidation validateGrokStatement(GrokValidation grokValidation) throws RestException {
         Map<String, Object> results;
         try {
+            String statement = Strings.isEmpty(grokValidation.getStatement()) ? "" : grokValidation.getStatement();
+
             Grok grok = new Grok();
             grok.addPatternFromReader(new InputStreamReader(getClass().getResourceAsStream("/patterns/common")));
-            grok.addPatternFromReader(new StringReader(grokValidation.getStatement()));
-            String patternLabel = grokValidation.getStatement().substring(0, grokValidation.getStatement().indexOf(" "));
+            grok.addPatternFromReader(new StringReader(statement));
+            String patternLabel = statement.substring(0, statement.indexOf(" "));
             String grokPattern = "%{" + patternLabel + "}";
             grok.compile(grokPattern);
             Match gm = grok.match(grokValidation.getSampleData());
@@ -55,7 +61,7 @@ public class GrokServiceImpl implements GrokService {
             results = gm.toMap();
             results.remove(patternLabel);
         } catch (StringIndexOutOfBoundsException e) {
-            throw new RestException("A pattern label must be included (ex. PATTERN_LABEL ${PATTERN:field} ...)", e.getCause());
+            throw new RestException("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)", e.getCause());
         } catch (Exception e) {
             throw new RestException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
index 7246c2f..7b10dc4 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java
@@ -18,10 +18,9 @@
 package org.apache.metron.rest.service.impl;
 
 import kafka.admin.AdminOperationException;
-import kafka.admin.AdminUtils;
+import kafka.admin.AdminUtils$;
 import kafka.admin.RackAwareMode;
 import kafka.utils.ZkUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -32,27 +31,30 @@ import org.apache.metron.rest.service.KafkaService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 @Service
 public class KafkaServiceImpl implements KafkaService {
-
-    @Autowired
     private ZkUtils zkUtils;
-
-    @Autowired
     private KafkaConsumer<String, String> kafkaConsumer;
+    private AdminUtils$ adminUtils;
 
-    private String offsetTopic = "__consumer_offsets";
+    @Autowired
+    public KafkaServiceImpl(ZkUtils zkUtils, KafkaConsumer<String, String> kafkaConsumer, AdminUtils$ adminUtils) {
+        this.zkUtils = zkUtils;
+        this.kafkaConsumer = kafkaConsumer;
+        this.adminUtils = adminUtils;
+    }
 
     @Override
     public KafkaTopic createTopic(KafkaTopic topic) throws RestException {
         if (!listTopics().contains(topic.getName())) {
           try {
-              AdminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
+              adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$);
           } catch (AdminOperationException e) {
               throw new RestException(e);
           }
@@ -62,8 +64,9 @@ public class KafkaServiceImpl implements KafkaService {
 
     @Override
     public boolean deleteTopic(String name) {
-        if (listTopics().contains(name)) {
-            AdminUtils.deleteTopic(zkUtils, name);
+        Set<String> topics = listTopics();
+        if (topics != null && topics.contains(name)) {
+            adminUtils.deleteTopic(zkUtils, name);
             return true;
         } else {
             return false;
@@ -90,8 +93,9 @@ public class KafkaServiceImpl implements KafkaService {
     public Set<String> listTopics() {
         Set<String> topics;
         synchronized (this) {
-            topics = kafkaConsumer.listTopics().keySet();
-            topics.remove(offsetTopic);
+            Map<String, List<PartitionInfo>> topicsInfo = kafkaConsumer.listTopics();
+            topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet();
+            topics.remove(CONSUMER_OFFSETS_TOPIC);
         }
         return topics;
     }
@@ -101,20 +105,16 @@ public class KafkaServiceImpl implements KafkaService {
         String message = null;
         if (listTopics().contains(topic)) {
             synchronized (this) {
-                kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream().map(partitionInfo ->
-                        new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
-                for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
-                    long offset = kafkaConsumer.position(topicPartition) - 1;
-                    if (offset >= 0) {
-                        kafkaConsumer.seek(topicPartition, offset);
-                    }
-                }
+                kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream()
+                    .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+                    .collect(Collectors.toList()));
+
+                kafkaConsumer.assignment().stream()
+                    .filter(p -> (kafkaConsumer.position(p) -1) >= 0)
+                    .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1));
+
                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
-                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
-                if (iterator.hasNext()) {
-                    ConsumerRecord<String, String> record = iterator.next();
-                    message = record.value();
-                }
+                message  = records.isEmpty() ? null : records.iterator().next().value();
                 kafkaConsumer.unsubscribe();
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index 7931fe6..edfd542 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -17,7 +17,7 @@
  */
 package org.apache.metron.rest.config;
 
-import com.google.common.base.Function;
+import kafka.admin.AdminUtils$;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
@@ -38,7 +38,6 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Profile;
 import org.springframework.web.client.RestTemplate;
 
-import javax.annotation.Nullable;
 import java.util.Properties;
 
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
@@ -121,4 +120,9 @@ public class TestConfig {
     restTemplate.setMockStormCLIClientWrapper((MockStormCLIClientWrapper) stormCLIClientWrapper);
     return restTemplate;
   }
+
+  @Bean
+  public AdminUtils$ adminUtils() {
+    return AdminUtils$.MODULE$;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
index e618d48..4532616 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GrokControllerIntegrationTest.java
@@ -105,7 +105,7 @@ public class GrokControllerIntegrationTest {
                 .andExpect(status().isInternalServerError())
                 .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
                 .andExpect(jsonPath("$.responseCode").value(500))
-                .andExpect(jsonPath("$.message").value("A pattern label must be included (ex. PATTERN_LABEL ${PATTERN:field} ...)"));
+                .andExpect(jsonPath("$.message").value("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)"));
 
         this.mockMvc.perform(get(grokUrl + "/list").with(httpBasic(user,password)))
                 .andExpect(status().isOk())

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java
deleted file mode 100644
index f7e43ab..0000000
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/HdfsServiceTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.rest.service;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.metron.rest.config.HadoopConfig;
-import org.apache.metron.rest.service.impl.HdfsServiceImpl;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Profile;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@RunWith(SpringJUnit4ClassRunner.class)
-@ContextConfiguration(classes={HadoopConfig.class, HdfsServiceTest.HdfsServiceTestContextConfiguration.class})
-@ActiveProfiles(TEST_PROFILE)
-public class HdfsServiceTest {
-
-    @Configuration
-    @Profile("test")
-    static class HdfsServiceTestContextConfiguration {
-
-        @Bean
-        public HdfsService hdfsService() {
-            return new HdfsServiceImpl();
-        }
-    }
-
-    @Autowired
-    private HdfsService hdfsService;
-
-    @Test
-    public void test() throws IOException {
-        String rootDir = "./src/test/tmp";
-        File rootFile = new File(rootDir);
-        Path rootPath = new Path(rootDir);
-        if (rootFile.exists()) {
-            FileUtils.cleanDirectory(rootFile);
-            FileUtils.deleteDirectory(rootFile);
-        }
-        assertEquals(true, rootFile.mkdir());
-        String fileName1 = "fileName1";
-        String fileName2 = "fileName2";
-        Path path1 = new Path(rootDir, fileName1);
-        String value1 = "value1";
-        String value2 = "value2";
-        Path path2 = new Path(rootDir, fileName2);
-        String invalidFile = "invalidFile";
-        Path pathInvalidFile = new Path(rootDir, invalidFile);
-
-        FileStatus[] fileStatuses = hdfsService.list(new Path(rootDir));
-        assertEquals(0, fileStatuses.length);
-
-
-        hdfsService.write(path1, value1.getBytes());
-        assertEquals(value1, FileUtils.readFileToString(new File(rootDir, fileName1)));
-        assertEquals(value1, new String(hdfsService.read(path1)));
-
-        fileStatuses = hdfsService.list(rootPath);
-        assertEquals(1, fileStatuses.length);
-        assertEquals(fileName1, fileStatuses[0].getPath().getName());
-
-        hdfsService.write(path2, value2.getBytes());
-        assertEquals(value2, FileUtils.readFileToString(new File(rootDir, fileName2)));
-        assertEquals(value2, new String(hdfsService.read(path2)));
-
-        fileStatuses = hdfsService.list(rootPath);
-        assertEquals(2, fileStatuses.length);
-        assertEquals(fileName1, fileStatuses[0].getPath().getName());
-        assertEquals(fileName1, fileStatuses[0].getPath().getName());
-
-        assertEquals(true, hdfsService.delete(path1, false));
-        fileStatuses = hdfsService.list(rootPath);
-        assertEquals(1, fileStatuses.length);
-        assertEquals(fileName2, fileStatuses[0].getPath().getName());
-        assertEquals(true, hdfsService.delete(path2, false));
-        fileStatuses = hdfsService.list(rootPath);
-        assertEquals(0, fileStatuses.length);
-
-        try {
-            hdfsService.read(pathInvalidFile);
-            fail("Exception should be thrown when reading invalid file name");
-        } catch(IOException e) {
-        }
-        assertEquals(false, hdfsService.delete(pathInvalidFile, false));
-
-        FileUtils.deleteDirectory(new File(rootDir));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java
new file mode 100644
index 0000000..1217bcb
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/DockerStormCLIWrapperTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.core.env.Environment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyVararg;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@SuppressWarnings("unchecked")
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DockerStormCLIWrapper.class, ProcessBuilder.class})
+public class DockerStormCLIWrapperTest {
+  private ProcessBuilder processBuilder;
+  private Environment environment;
+  private DockerStormCLIWrapper dockerStormCLIWrapper;
+
+  @Before
+  public void setUp() throws Exception {
+    processBuilder = mock(ProcessBuilder.class);
+    environment = mock(Environment.class);
+
+    dockerStormCLIWrapper = new DockerStormCLIWrapper(environment);
+  }
+
+  @Test
+  public void getProcessBuilderShouldProperlyGenerateProcessorBuilder() throws Exception {
+    whenNew(ProcessBuilder.class).withParameterTypes(String[].class).withArguments(anyVararg()).thenReturn(processBuilder);
+
+    when(processBuilder.environment()).thenReturn(new HashMap<>());
+    when(processBuilder.command()).thenReturn(new ArrayList<>());
+
+    Process process = mock(Process.class);
+    InputStream inputStream = new InputStream() {
+      @Override
+      public int read() throws IOException {
+        return -1;
+      }
+    };
+
+    when(processBuilder.start()).thenReturn(process);
+    when(process.getInputStream()).thenReturn(inputStream);
+    when(environment.getProperty("docker.compose.path")).thenReturn("/test");
+    when(environment.getProperty("metron.version")).thenReturn("1");
+
+
+    ProcessBuilder actualBuilder = dockerStormCLIWrapper.getProcessBuilder("oo", "ooo");
+
+    assertEquals(new HashMap<String, String>() {{ put("METRON_VERSION", "1"); }}, actualBuilder.environment());
+    assertEquals(new ArrayList<>(), actualBuilder.command());
+
+    verify(process).waitFor();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
new file mode 100644
index 0000000..59d5957
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.metron.common.configuration.ConfigurationType;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.service.GlobalConfigService;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ALL")
+public class GlobalConfigServiceImplTest {
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  CuratorFramework curatorFramework;
+  GlobalConfigService globalConfigService;
+
+  @Before
+  public void setUp() throws Exception {
+    curatorFramework = mock(CuratorFramework.class);
+    globalConfigService = new GlobalConfigServiceImpl(curatorFramework);
+  }
+
+
+  @Test
+  public void deleteShouldProperlyCatchNoNodeExceptionAndReturnFalse() throws Exception {
+    DeleteBuilder builder = mock(DeleteBuilder.class);
+
+    when(curatorFramework.delete()).thenReturn(builder);
+    when(builder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
+
+    assertFalse(globalConfigService.delete());
+  }
+
+  @Test
+  public void deleteShouldProperlyCatchNonNoNodeExceptionAndThrowRestException() throws Exception {
+    exception.expect(RestException.class);
+
+    DeleteBuilder builder = mock(DeleteBuilder.class);
+
+    when(curatorFramework.delete()).thenReturn(builder);
+    when(builder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(Exception.class);
+
+    assertFalse(globalConfigService.delete());
+  }
+
+  @Test
+  public void deleteShouldReturnTrueWhenClientSuccessfullyCallsDelete() throws Exception {
+    DeleteBuilder builder = mock(DeleteBuilder.class);
+
+    when(curatorFramework.delete()).thenReturn(builder);
+    when(builder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(null);
+
+    assertTrue(globalConfigService.delete());
+
+    verify(curatorFramework).delete();
+  }
+
+  @Test
+  public void getShouldProperlyReturnGlobalConfig() throws Exception {
+    final String config = "{\"k\":\"v\"}";
+    final Map<String, Object> configMap = new HashMap<String, Object>() {{
+      put("k", "v");
+    }};
+
+    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(config.getBytes());
+
+    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+
+    assertEquals(configMap, globalConfigService.get());
+  }
+
+  @Test
+  public void getShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception {
+    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class);
+
+    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+
+    assertNull(globalConfigService.get());
+  }
+
+  @Test
+  public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception {
+    exception.expect(RestException.class);
+
+    GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+    when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(Exception.class);
+
+    when(curatorFramework.getData()).thenReturn(getDataBuilder);
+
+    globalConfigService.get();
+  }
+
+  @Test
+  public void saveShouldWrapExceptionInRestException() throws Exception {
+    exception.expect(RestException.class);
+
+    SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
+    when(setDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot(), "{}".getBytes())).thenThrow(Exception.class);
+
+    when(curatorFramework.setData()).thenReturn(setDataBuilder);
+
+    globalConfigService.save(new HashMap<>());
+  }
+
+  @Test
+  public void saveShouldReturnSameConfigThatIsPassedOnSuccessfulSave() throws Exception {
+    SetDataBuilder setDataBuilder = mock(SetDataBuilder.class);
+    when(setDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot(), "{}".getBytes())).thenReturn(new Stat());
+
+    when(curatorFramework.setData()).thenReturn(setDataBuilder);
+
+    assertEquals(new HashMap<>(), globalConfigService.save(new HashMap<>()));
+    verify(setDataBuilder).forPath(eq(ConfigurationType.GLOBAL.getZookeeperRoot()), eq("{}".getBytes()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java
new file mode 100644
index 0000000..7fc8748
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GrokServiceImplTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import oi.thekraken.grok.api.Grok;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.GrokValidation;
+import org.apache.metron.rest.service.GrokService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class GrokServiceImplTest {
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  private Grok grok;
+  private GrokService grokService;
+
+  @Before
+  public void setUp() throws Exception {
+    grok = mock(Grok.class);
+    grokService = new GrokServiceImpl(grok);
+  }
+
+  @Test
+  public void getCommonGrokPattersShouldCallGrokToGetPatterns() throws Exception {
+    grokService.getCommonGrokPatterns();
+
+    verify(grok).getPatterns();
+  }
+
+  @Test
+  public void getCommonGrokPattersShouldCallGrokToGetPatternsAndNotAlterValue() throws Exception {
+    Map<String, String> patterns = new HashMap<String, String>() {{
+      put("k", "v");
+      put("k1", "v1");
+    }};
+
+    when(grok.getPatterns()).thenReturn(patterns);
+
+    assertEquals(patterns, grokService.getCommonGrokPatterns());
+  }
+
+  @Test
+  public void validateGrokStatementShouldThrowExceptionWithEmptyStringAsStatement() throws Exception {
+    exception.expect(RestException.class);
+    exception.expectMessage("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)");
+
+    GrokValidation grokValidation = new GrokValidation();
+    grokValidation.setResults(new HashMap<>());
+    grokValidation.setSampleData("asdf asdf");
+    grokValidation.setStatement("");
+
+    grokService.validateGrokStatement(grokValidation);
+  }
+
+  @Test
+  public void validateGrokStatementShouldThrowExceptionWithNullStringAsStatement() throws Exception {
+    exception.expect(RestException.class);
+    exception.expectMessage("A pattern label must be included (eg. PATTERN_LABEL %{PATTERN:field} ...)");
+
+    GrokValidation grokValidation = new GrokValidation();
+    grokValidation.setResults(new HashMap<>());
+    grokValidation.setSampleData("asdf asdf");
+    grokValidation.setStatement(null);
+
+    grokService.validateGrokStatement(grokValidation);
+  }
+
+  @Test
+  public void validateGrokStatementShouldProperlyMatchSampleDataAgainstGivenStatement() throws Exception {
+    GrokValidation grokValidation = new GrokValidation();
+    grokValidation.setResults(new HashMap<>());
+    grokValidation.setSampleData("asdf asdf");
+    grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+    GrokValidation expectedGrokValidation = new GrokValidation();
+    expectedGrokValidation.setResults(new HashMap<String, Object>() {{ put("word1", "asdf"); put("word2", "asdf"); }});
+    expectedGrokValidation.setSampleData("asdf asdf");
+    expectedGrokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+    assertEquals(expectedGrokValidation, grokService.validateGrokStatement(grokValidation));
+  }
+
+  @Test
+  public void validateGrokStatementShouldProperlyMatchNothingAgainstEmptyString() throws Exception {
+    GrokValidation grokValidation = new GrokValidation();
+    grokValidation.setResults(new HashMap<>());
+    grokValidation.setSampleData("");
+    grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+    GrokValidation expectedGrokValidation = new GrokValidation();
+    expectedGrokValidation.setResults(new HashMap<>());
+    expectedGrokValidation.setSampleData("");
+    expectedGrokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+    assertEquals(expectedGrokValidation, grokService.validateGrokStatement(grokValidation));
+  }
+
+  @Test
+  public void validateGrokStatementShouldProperlyMatchNothingAgainstNullString() throws Exception {
+    GrokValidation grokValidation = new GrokValidation();
+    grokValidation.setResults(new HashMap<>());
+    grokValidation.setSampleData(null);
+    grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+    GrokValidation expectedGrokValidation = new GrokValidation();
+    expectedGrokValidation.setResults(new HashMap<>());
+    expectedGrokValidation.setSampleData(null);
+    expectedGrokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2}");
+
+    assertEquals(expectedGrokValidation, grokService.validateGrokStatement(grokValidation));
+  }
+
+  @Test
+  public void invalidGrokStatementShouldThrowRestException() throws Exception {
+    exception.expect(RestException.class);
+
+    GrokValidation grokValidation = new GrokValidation();
+    grokValidation.setResults(new HashMap<>());
+    grokValidation.setSampleData(null);
+    grokValidation.setStatement("LABEL %{WORD:word1} %{WORD:word2");
+
+    grokService.validateGrokStatement(grokValidation);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java
new file mode 100644
index 0000000..d67892e
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/HdfsServiceImplTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.metron.rest.config.HadoopConfig;
+import org.apache.metron.rest.service.HdfsService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes={HadoopConfig.class, HdfsServiceImplTest.HdfsServiceTestContextConfiguration.class})
+@ActiveProfiles(TEST_PROFILE)
+public class HdfsServiceImplTest {
+
+    @Configuration
+    @Profile("test")
+    static class HdfsServiceTestContextConfiguration {
+
+        @Bean
+        public HdfsService hdfsService() {
+            return new HdfsServiceImpl();
+        }
+    }
+
+    @Autowired
+    private HdfsService hdfsService;
+
+    @Test
+    public void test() throws IOException {
+        String rootDir = "./src/test/tmp";
+        File rootFile = new File(rootDir);
+        Path rootPath = new Path(rootDir);
+        if (rootFile.exists()) {
+            FileUtils.cleanDirectory(rootFile);
+            FileUtils.deleteDirectory(rootFile);
+        }
+        assertEquals(true, rootFile.mkdir());
+        String fileName1 = "fileName1";
+        String fileName2 = "fileName2";
+        Path path1 = new Path(rootDir, fileName1);
+        String value1 = "value1";
+        String value2 = "value2";
+        Path path2 = new Path(rootDir, fileName2);
+        String invalidFile = "invalidFile";
+        Path pathInvalidFile = new Path(rootDir, invalidFile);
+
+        FileStatus[] fileStatuses = hdfsService.list(new Path(rootDir));
+        assertEquals(0, fileStatuses.length);
+
+
+        hdfsService.write(path1, value1.getBytes());
+        assertEquals(value1, FileUtils.readFileToString(new File(rootDir, fileName1)));
+        assertEquals(value1, new String(hdfsService.read(path1)));
+
+        fileStatuses = hdfsService.list(rootPath);
+        assertEquals(1, fileStatuses.length);
+        assertEquals(fileName1, fileStatuses[0].getPath().getName());
+
+        hdfsService.write(path2, value2.getBytes());
+        assertEquals(value2, FileUtils.readFileToString(new File(rootDir, fileName2)));
+        assertEquals(value2, new String(hdfsService.read(path2)));
+
+        fileStatuses = hdfsService.list(rootPath);
+        assertEquals(2, fileStatuses.length);
+        assertEquals(fileName1, fileStatuses[0].getPath().getName());
+        assertEquals(fileName1, fileStatuses[0].getPath().getName());
+
+        assertEquals(true, hdfsService.delete(path1, false));
+        fileStatuses = hdfsService.list(rootPath);
+        assertEquals(1, fileStatuses.length);
+        assertEquals(fileName2, fileStatuses[0].getPath().getName());
+        assertEquals(true, hdfsService.delete(path2, false));
+        fileStatuses = hdfsService.list(rootPath);
+        assertEquals(0, fileStatuses.length);
+
+        try {
+            hdfsService.read(pathInvalidFile);
+            fail("Exception should be thrown when reading invalid file name");
+        } catch(IOException e) {
+        }
+        assertEquals(false, hdfsService.delete(pathInvalidFile, false));
+
+        FileUtils.deleteDirectory(new File(rootDir));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/a61dbcf7/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
new file mode 100644
index 0000000..b211ee6
--- /dev/null
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.rest.service.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import kafka.admin.AdminOperationException;
+import kafka.admin.AdminUtils$;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZkUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.metron.rest.RestException;
+import org.apache.metron.rest.model.KafkaTopic;
+import org.apache.metron.rest.service.KafkaService;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+@SuppressWarnings("unchecked")
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*") // resolve classloader conflict
+@PrepareForTest({AdminUtils$.class})
+public class KafkaServiceImplTest {
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  private ZkUtils zkUtils;
+  private KafkaConsumer<String, String> kafkaConsumer;
+  private AdminUtils$ adminUtils;
+
+  private KafkaService kafkaService;
+
+  private static final KafkaTopic VALID_KAFKA_TOPIC = new KafkaTopic() {{
+    setReplicationFactor(2);
+    setNumPartitions(1);
+    setName("t");
+    setProperties(new Properties());
+  }};
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws Exception {
+    zkUtils = mock(ZkUtils.class);
+    kafkaConsumer = mock(KafkaConsumer.class);
+    adminUtils = mock(AdminUtils$.class);
+
+    kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumer, adminUtils);
+  }
+
+  @Test
+  public void listTopicsHappyPathWithListTopicsReturningNull() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = null;
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    final Set<String> listedTopics = kafkaService.listTopics();
+
+    assertEquals(Sets.newHashSet(), listedTopics);
+
+    verifyZeroInteractions(zkUtils);
+    verify(kafkaConsumer).listTopics();
+    verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils);
+  }
+
+  @Test
+  public void listTopicsHappyPathWithListTopicsReturningEmptyMap() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    final Set<String> listedTopics = kafkaService.listTopics();
+
+    assertEquals(Sets.newHashSet(), listedTopics);
+
+    verifyZeroInteractions(zkUtils);
+    verify(kafkaConsumer).listTopics();
+    verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+  }
+
+  @Test
+  public void listTopicsHappyPath() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+    topics.put("topic1", Lists.newArrayList());
+    topics.put("topic2", Lists.newArrayList());
+    topics.put("topic3", Lists.newArrayList());
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    final Set<String> listedTopics = kafkaService.listTopics();
+
+    assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
+
+    verifyZeroInteractions(zkUtils);
+    verify(kafkaConsumer).listTopics();
+    verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+  }
+
+  @Test
+  public void listTopicsShouldProperlyRemoveOffsetTopic() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+    topics.put("topic1", Lists.newArrayList());
+    topics.put("topic2", Lists.newArrayList());
+    topics.put("topic3", Lists.newArrayList());
+    topics.put("__consumer_offsets", Lists.newArrayList());
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    final Set<String> listedTopics = kafkaService.listTopics();
+
+    assertEquals(Sets.newHashSet("topic1", "topic2", "topic3"), listedTopics);
+
+    verifyZeroInteractions(zkUtils);
+    verify(kafkaConsumer).listTopics();
+    verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+  }
+
+  @Test
+  public void deletingTopicThatDoesNotExistShouldReturnFalse() throws Exception {
+    when(kafkaConsumer.listTopics()).thenReturn(Maps.newHashMap());
+
+    assertFalse(kafkaService.deleteTopic("non_existent_topic"));
+
+    verifyZeroInteractions(zkUtils);
+    verify(kafkaConsumer).listTopics();
+    verifyNoMoreInteractions(kafkaConsumer, zkUtils);
+  }
+
+  @Test
+  public void deletingTopicThatExistShouldReturnTrue() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+    topics.put("non_existent_topic", Lists.newArrayList());
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    assertTrue(kafkaService.deleteTopic("non_existent_topic"));
+
+    verify(kafkaConsumer).listTopics();
+    verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic");
+    verifyNoMoreInteractions(kafkaConsumer);
+  }
+
+  @Test
+  public void makeSureDeletingTopicReturnsFalseWhenNoTopicsExist() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = null;
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    assertFalse(kafkaService.deleteTopic("non_existent_topic"));
+
+    verify(kafkaConsumer).listTopics();
+    verifyNoMoreInteractions(kafkaConsumer);
+  }
+
+  @Test
+  public void getTopicShouldProperlyMapTopicToKafkaTopic() throws Exception {
+    final PartitionInfo partitionInfo = mock(PartitionInfo.class);
+    when(partitionInfo.replicas()).thenReturn(new Node[] {new Node(1, "host", 8080)});
+
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+    topics.put("t", Lists.newArrayList(partitionInfo));
+    topics.put("t1", Lists.newArrayList());
+
+    final KafkaTopic expected = new KafkaTopic();
+    expected.setName("t");
+    expected.setNumPartitions(1);
+    expected.setReplicationFactor(1);
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+    when(kafkaConsumer.partitionsFor("t")).thenReturn(Lists.newArrayList(partitionInfo));
+
+    assertEquals(expected, kafkaService.getTopic("t"));
+  }
+
+  @Test
+  public void getTopicShouldProperlyHandleTopicsThatDontExist() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+    topics.put("t1", Lists.newArrayList());
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+    when(kafkaConsumer.partitionsFor("t")).thenReturn(Lists.newArrayList());
+
+    assertEquals(null, kafkaService.getTopic("t"));
+
+    verify(kafkaConsumer).listTopics();
+    verify(kafkaConsumer, times(0)).partitionsFor("t");
+    verifyZeroInteractions(zkUtils);
+    verifyNoMoreInteractions(kafkaConsumer);
+  }
+
+  @Test
+  public void createTopicShouldFailIfReplicationFactorIsGreaterThanAvailableBrokers() throws Exception {
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    kafkaService.createTopic(VALID_KAFKA_TOPIC);
+
+    verify(adminUtils).createTopic(eq(zkUtils), eq("t"), eq(1), eq(2), eq(new Properties()), eq(RackAwareMode.Disabled$.MODULE$));
+    verify(kafkaConsumer).listTopics();
+    verifyZeroInteractions(zkUtils);
+  }
+
+  @Test
+  public void whenAdminUtilsThrowsAdminOperationExceptionCreateTopicShouldProperlyWrapExceptionInRestException() throws Exception {
+    exception.expect(RestException.class);
+
+    final Map<String, List<PartitionInfo>> topics = new HashMap<>();
+    topics.put("1", new ArrayList<>());
+
+    when(kafkaConsumer.listTopics()).thenReturn(topics);
+
+    doThrow(AdminOperationException.class).when(adminUtils).createTopic(eq(zkUtils), eq("t"), eq(1), eq(2), eq(new Properties()), eq(RackAwareMode.Disabled$.MODULE$));
+
+    kafkaService.createTopic(VALID_KAFKA_TOPIC);
+  }
+
+  @Test
+  public void getSampleMessageProperlyReturnsAMessageFromAGivenKafkaTopic() throws Exception {
+    final String topicName = "t";
+    final Node host = new Node(1, "host", 8080);
+    final Node[] replicas = {host};
+    final List<PartitionInfo> partitionInfo = Lists.newArrayList(new PartitionInfo(topicName, 1, host, replicas, replicas));
+    final TopicPartition topicPartition = new TopicPartition(topicName, 1);
+    final List<TopicPartition> topicPartitions = Lists.newArrayList(topicPartition);
+    final Set<TopicPartition> topicPartitionsSet = Sets.newHashSet(topicPartitions);
+    final ConsumerRecords<String, String> records = new ConsumerRecords<>(new HashMap<TopicPartition, List<ConsumerRecord<String, String>>>() {{
+      put(topicPartition, Lists.newArrayList(new ConsumerRecord<>(topicName, 1, 1, "k", "message")));
+    }});
+
+    when(kafkaConsumer.listTopics()).thenReturn(new HashMap<String, List<PartitionInfo>>() {{ put(topicName, Lists.newArrayList()); }});
+    when(kafkaConsumer.partitionsFor(eq(topicName))).thenReturn(partitionInfo);
+    when(kafkaConsumer.assignment()).thenReturn(topicPartitionsSet);
+    when(kafkaConsumer.position(topicPartition)).thenReturn(1L);
+    when(kafkaConsumer.poll(100)).thenReturn(records);
+
+    assertEquals("message", kafkaService.getSampleMessage(topicName));
+
+    verify(kafkaConsumer).assign(eq(topicPartitions));
+    verify(kafkaConsumer).assignment();
+    verify(kafkaConsumer).poll(100);
+    verify(kafkaConsumer).unsubscribe();
+    verify(kafkaConsumer, times(2)).position(topicPartition);
+    verify(kafkaConsumer).seek(topicPartition, 0);
+
+    verifyZeroInteractions(zkUtils, adminUtils);
+  }
+}