You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/12/19 18:48:50 UTC

incubator-metron git commit: METRON-625: Parser Filters cannot be specified from the sensor config closes apache/incubator-metron#396

Repository: incubator-metron
Updated Branches:
  refs/heads/master ec41e7321 -> befeb439e


METRON-625: Parser Filters cannot be specified from the sensor config closes apache/incubator-metron#396


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/befeb439
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/befeb439
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/befeb439

Branch: refs/heads/master
Commit: befeb439e70aa45a051597e513f5ae9d9145864a
Parents: ec41e73
Author: cstella <ce...@gmail.com>
Authored: Mon Dec 19 13:48:32 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Mon Dec 19 13:48:32 2016 -0500

----------------------------------------------------------------------
 metron-platform/metron-parsers/README.md        | 32 +++++------
 .../apache/metron/parsers/bolt/ParserBolt.java  | 21 +++----
 .../apache/metron/parsers/filters/Filters.java  | 17 +++---
 .../parsers/filters/GenericMessageFilter.java   | 43 ---------------
 .../metron/parsers/filters/QueryFilter.java     | 58 --------------------
 .../metron/parsers/filters/StellarFilter.java   | 58 ++++++++++++++++++++
 .../org/apache/metron/filters/FiltersTest.java  | 14 +----
 .../metron/parsers/bolt/ParserBoltTest.java     | 57 ++++++++++++++++---
 .../metron/writer/BulkWriterComponent.java      |  2 +-
 9 files changed, 144 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/README.md b/metron-platform/metron-parsers/README.md
index 0be3da4..cb51274 100644
--- a/metron-platform/metron-parsers/README.md
+++ b/metron-platform/metron-parsers/README.md
@@ -82,6 +82,17 @@ documents stored in zookeeper.
 The document is structured in the following way
 
 * `parserClassName` : The fully qualified classname for the parser to be used.
+* `filterClassName` : The filter to use.  This may be a fully qualified classname of a Class that implements the `org.apache.metron.parsers.interfaces.MessageFilter<JSONObject>` interface.  Message Filters are intended to allow the user to ignore a set of messages via custom logic.  The existing implementations are:
+  * `STELLAR` : Allows you to apply a stellar statement which returns a boolean, which will pass every message for which the statement returns `true`.  The Stellar statement that is to be applied is specified by the `filter.query` property in the `parserConfig`.
+Example Stellar Filter which includes messages which contain a the `field1` field:
+```
+   {
+    "filterClassName" : "STELLAR"
+   ,"parserConfig" : {
+    "filter.query" : "exists(field1)"
+    }
+   }
+```
 * `sensorTopic` : The kafka topic to send the parsed messages to.
 * `parserConfig` : A JSON Map representing the parser implementation specific configuration.
 * `fieldTransformations` : An array of complex objects representing the transformations to be done on the message generated from the parser before writing out to the kafka topic.
@@ -214,27 +225,10 @@ Consider the following example configuration for the `yaf` sensor:
 }
 ```
 
-##Parser Bolt
-
-The Metron parser bolt is a standard bolt, which can be extended with multiple Java and Grok parser adapter for parsing different topology messages.  The bolt signature for declaration in a storm topology is as follows:
-
-```
-AbstractParserBolt parser_bolt = new TelemetryParserBolt()
-.withMessageParser(parser)
-.withMessageFilter(new GenericMessageFilter())
-.withMetricConfig(config);
-
-```
-
-Metric Config - optional argument for exporting custom metrics to graphite.  If set to null no metrics will be exported.  If set, then a list of metrics defined in the metrics.conf file of each topology will define will metrics are exported and how often.
-
-Message Filter - a filter defining which messages can be dropped.  This feature is only present in the Java paerer adapters
-
-Message Parser - defines the parser adapter to be used for a topology
-
 ##Parser Adapters
 
-Parser adapters are loaded dynamically in each Metron topology.  They are defined in topology.conf in the configuration item bolt.parser.adapter
+Parser adapters are loaded dynamically in each Metron topology.  They
+are defined in the Parser Config (defined above) JSON file in Zookeeper.
 
 ###Java Parser Adapters
 Java parser adapters are indended for higher-velocity topologies and are not easily changed or extended.  As the adoption of Metron continues we plan on extending our library of Java adapters to process more log formats.  As of this moment the Java adapters included with Metron are:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index 557e71d..abbd9d8 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.bolt;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -31,7 +32,6 @@ import org.apache.metron.common.dsl.Context;
 import org.apache.metron.common.dsl.StellarFunctions;
 import org.apache.metron.parsers.filters.Filters;
 import org.apache.metron.common.configuration.FieldTransformer;
-import org.apache.metron.parsers.filters.GenericMessageFilter;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.apache.metron.parsers.interfaces.MessageParser;
@@ -47,7 +47,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(ParserBolt.class);
   private OutputCollector collector;
   private MessageParser<JSONObject> parser;
-  private MessageFilter<JSONObject> filter = new GenericMessageFilter();
+  //default filter is noop, so pass everything through.
+  private MessageFilter<JSONObject> filter;
   private WriterHandler writer;
   private org.apache.metron.common.dsl.Context stellarContext;
   public ParserBolt( String zookeeperUrl
@@ -73,15 +74,15 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     super.prepare(stormConf, context, collector);
     this.collector = collector;
     initializeStellar();
-    if(getSensorParserConfig() == null) {
-      filter = new GenericMessageFilter();
-    }
-    else if(filter == null) {
+    if(getSensorParserConfig() != null && filter == null) {
       getSensorParserConfig().getParserConfig().putIfAbsent("stellarContext", stellarContext);
-      filter = Filters.get(getSensorParserConfig().getFilterClassName()
-              , getSensorParserConfig().getParserConfig()
-      );
+      if (!StringUtils.isEmpty(getSensorParserConfig().getFilterClassName())) {
+        filter = Filters.get(getSensorParserConfig().getFilterClassName()
+                , getSensorParserConfig().getParserConfig()
+        );
+      }
     }
+
     parser.init();
 
     writer.init(stormConf, collector, getConfigurations());
@@ -124,7 +125,7 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
               handler.transformAndUpdate(message, sensorParserConfig.getParserConfig(), stellarContext);
             }
           }
-          if (parser.validate(message) && filter != null && filter.emitTuple(message, stellarContext)) {
+          if (parser.validate(message) && (filter == null || filter.emitTuple(message, stellarContext))) {
             numWritten++;
             if(!isGloballyValid(message, fieldValidations)) {
               message.put(Constants.SENSOR_TYPE, getSensorType()+ ".invalid");

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
index fc9a49e..03a4098 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/Filters.java
@@ -22,14 +22,12 @@ import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.json.simple.JSONObject;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.Map;
 
 public enum Filters {
    BRO(BroMessageFilter.class)
-  ,QUERY(QueryFilter.class)
-  ,DEFAULT(GenericMessageFilter.class)
+  ,STELLAR(StellarFilter.class)
+  ,DEFAULT(null)
   ;
   Class<? extends MessageFilter> clazz;
   Filters(Class<? extends MessageFilter> clazz) {
@@ -37,7 +35,7 @@ public enum Filters {
   }
   public static MessageFilter<JSONObject> get(String filterName, Map<String, Object> config) {
     if(filterName == null || filterName.trim().isEmpty()) {
-      return new GenericMessageFilter();
+      return null;
     }
     Class<? extends MessageFilter> filterClass;
     try {
@@ -51,8 +49,11 @@ public enum Filters {
         throw new IllegalStateException("Unable to find class " + filterName, e);
       }
     }
-    MessageFilter<JSONObject> filter = ReflectionUtils.createInstance(filterClass);
-    filter.configure(config);
-    return filter;
+    if(filterClass != null) {
+      MessageFilter<JSONObject> filter = ReflectionUtils.createInstance(filterClass);
+      filter.configure(config);
+      return filter;
+    }
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
deleted file mode 100644
index 1441787..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/GenericMessageFilter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.parsers.filters;
-
-import org.apache.metron.common.dsl.Context;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class GenericMessageFilter implements MessageFilter<JSONObject>{
-
-	private static final long serialVersionUID = 3626397212398318852L;
-
-	public GenericMessageFilter() {
-	}
-
-	@Override
-	public boolean emitTuple(JSONObject message, Context context) {
-		return true;
-	}
-
-	@Override
-	public void configure(Map<String, Object> config) {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
deleted file mode 100644
index b56ab71..0000000
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/QueryFilter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.parsers.filters;
-
-import org.apache.metron.common.dsl.*;
-import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
-import org.apache.metron.common.stellar.StellarPredicateProcessor;
-import org.apache.metron.parsers.interfaces.MessageFilter;
-import org.json.simple.JSONObject;
-
-import java.util.Map;
-
-public class QueryFilter implements MessageFilter<JSONObject> {
-  public static final String QUERY_STRING_CONF = "filter.query";
-  private StellarPredicateProcessor processor = new StellarPredicateProcessor();
-  private String query;
-  private FunctionResolver functionResolver = StellarFunctions.FUNCTION_RESOLVER();
-
-  public QueryFilter()
-  {
-
-  }
-
-  @Override
-  public void configure(Map<String, Object> config) {
-    Object o = config.get(QUERY_STRING_CONF);
-    if(o instanceof String) {
-      query= o.toString();
-    }
-    Context stellarContext = (Context) config.get("stellarContext");
-    if(stellarContext == null) {
-      stellarContext = Context.EMPTY_CONTEXT();
-    }
-    processor.validate(query, true, stellarContext);
-  }
-
-  @Override
-  public boolean emitTuple(JSONObject message, Context context) {
-    VariableResolver resolver = new MapVariableResolver(message);
-    return processor.parse(query, resolver, functionResolver, context);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
new file mode 100644
index 0000000..a564e62
--- /dev/null
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/filters/StellarFilter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.parsers.filters;
+
+import org.apache.metron.common.dsl.*;
+import org.apache.metron.common.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.common.stellar.StellarPredicateProcessor;
+import org.apache.metron.parsers.interfaces.MessageFilter;
+import org.json.simple.JSONObject;
+
+import java.util.Map;
+
+public class StellarFilter implements MessageFilter<JSONObject> {
+  public static final String QUERY_STRING_CONF = "filter.query";
+  private StellarPredicateProcessor processor = new StellarPredicateProcessor();
+  private String query;
+  private FunctionResolver functionResolver = StellarFunctions.FUNCTION_RESOLVER();
+
+  public StellarFilter()
+  {
+
+  }
+
+  @Override
+  public void configure(Map<String, Object> config) {
+    Object o = config.get(QUERY_STRING_CONF);
+    if(o instanceof String) {
+      query= o.toString();
+    }
+    Context stellarContext = (Context) config.get("stellarContext");
+    if(stellarContext == null) {
+      stellarContext = Context.EMPTY_CONTEXT();
+    }
+    processor.validate(query, true, stellarContext);
+  }
+
+  @Override
+  public boolean emitTuple(JSONObject message, Context context) {
+    VariableResolver resolver = new MapVariableResolver(message);
+    return processor.parse(query, resolver, functionResolver, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
index dde9f7d..885fc3c 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/filters/FiltersTest.java
@@ -19,28 +19,20 @@
 package org.apache.metron.filters;
 
 import com.google.common.collect.ImmutableMap;
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.logging.log4j.core.filter.AbstractFilter;
-import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.dsl.Context;
-import org.apache.metron.parsers.filters.AbstractMessageFilter;
 import org.apache.metron.parsers.filters.Filters;
-import org.apache.metron.parsers.filters.GenericMessageFilter;
 import org.apache.metron.parsers.interfaces.MessageFilter;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
 public class FiltersTest {
   @Test
   public void testDefault() {
-    Assert.assertTrue(Filters.get("DEFAULT", null).emitTuple(null, null));
-    Assert.assertTrue(Filters.get(GenericMessageFilter.class.getName(), null).emitTuple(null, null));
+    Assert.assertNull(Filters.get("DEFAULT", null));
   }
 
   @Test
@@ -49,10 +41,10 @@ public class FiltersTest {
       Map<String, Object> config = new HashMap<String, Object>() {{
         put("filter.query", "exists(foo)");
       }};
-      MessageFilter<JSONObject> filter = Filters.get("QUERY", config);
+      MessageFilter<JSONObject> filter = Filters.get(Filters.STELLAR.name(), config);
       Assert.assertTrue(filter.emitTuple(new JSONObject(ImmutableMap.of("foo", 1)), Context.EMPTY_CONTEXT()));
       Assert.assertFalse(filter.emitTuple(new JSONObject(ImmutableMap.of("bar", 1)), Context.EMPTY_CONTEXT()));
     }
-
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index ea26d07..1efcf2e 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -52,10 +52,7 @@ import java.util.stream.Stream;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class ParserBoltTest extends BaseBoltTest {
 
@@ -245,7 +242,7 @@ public void testImplicitBatchOfOne() throws Exception {
 
   /**
    {
-    "filterClassName" : "QUERY"
+    "filterClassName" : "STELLAR"
    ,"parserConfig" : {
     "filter.query" : "exists(field1)"
     }
@@ -253,8 +250,13 @@ public void testImplicitBatchOfOne() throws Exception {
    */
   @Multiline
   public static String sensorParserConfig;
+
+  /**
+   * Tests to ensure that a message that is unfiltered results in one write and an ack.
+   * @throws Exception
+   */
   @Test
-  public void testFilter() throws Exception {
+  public void testFilterSuccess() throws Exception {
     String sensorType = "yaf";
 
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
@@ -272,13 +274,50 @@ public void testImplicitBatchOfOne() throws Exception {
     parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(parser, times(1)).init();
     verify(batchWriter, times(1)).init(any(), any());
+    BulkWriterResponse successResponse = mock(BulkWriterResponse.class);
+    when(successResponse.getSuccesses()).thenReturn(ImmutableList.of(t1));
+    when(batchWriter.write(any(), any(), any(), any())).thenReturn(successResponse);
     when(parser.validate(any())).thenReturn(true);
-    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject())));
-    parserBolt.withMessageFilter(filter);
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
+      put("field1", "blah");
+    }}))));
     parserBolt.execute(t1);
+    verify(batchWriter, times(1)).write(any(), any(), any(), any());
     verify(outputCollector, times(1)).ack(t1);
   }
 
+
+  /**
+   * Tests to ensure that a message filtered out results in no writes, but an ack.
+   * @throws Exception
+   */
+  @Test
+  public void testFilterFailure() throws Exception {
+    String sensorType = "yaf";
+
+    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", sensorType, parser, new WriterHandler(batchWriter)) {
+      @Override
+      protected SensorParserConfig getSensorParserConfig() {
+        try {
+          return SensorParserConfig.fromBytes(Bytes.toBytes(sensorParserConfig));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    parserBolt.setCuratorFramework(client);
+    parserBolt.setTreeCache(cache);
+    parserBolt.prepare(new HashMap(), topologyContext, outputCollector);
+    verify(parser, times(1)).init();
+    verify(batchWriter, times(1)).init(any(), any());
+    when(parser.validate(any())).thenReturn(true);
+    when(parser.parseOptional(any())).thenReturn(Optional.of(ImmutableList.of(new JSONObject(new HashMap<String, Object>() {{
+      put("field2", "blah");
+    }}))));
+    parserBolt.execute(t1);
+    verify(batchWriter, times(0)).write(any(), any(), any(), any());
+    verify(outputCollector, times(1)).ack(t1);
+  }
   /**
   {
      "sensorTopic":"dummy"
@@ -478,6 +517,8 @@ public void testImplicitBatchOfOne() throws Exception {
     verify(outputCollector, times(1)).ack(t5);
 
   }
+
+
   private static void writeNonBatch(OutputCollector collector, ParserBolt bolt, Tuple t) {
     bolt.execute(t);
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/befeb439/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index 738ccac..a601987 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -57,7 +57,7 @@ public class BulkWriterComponent<MESSAGE_T> {
   }
 
   public void commit(BulkWriterResponse response) {
-      commit(response.getSuccesses());
+    commit(response.getSuccesses());
   }
 
   public void error(Throwable e, Iterable<Tuple> tuples) {