You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@metron.apache.org by justinleet <gi...@git.apache.org> on 2016/09/30 12:13:46 UTC

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

GitHub user justinleet opened a pull request:

    https://github.com/apache/incubator-metron/pull/286

    METRON-326 Error Handling in ElasticsearchWriter

    To facilitate partial successes writing in Elasticsearch (i.e. subset of Tuples succeeds and another subset fails), the BulkMessageWriter.write() now returns a BulkWriterResponse object which contains both the successful and failed Tuples (So that we know which ones to ack successfully and which to send to the error stream).
    
    ElasticsearchWriter has been modified to be able to take advantage of partial writes.  The other implementations of the BulkWriter have only been affected to produce total success at the end of their methods (which is the same functionality previously implied by not hitting an Exception.
    
    Unit tests have been added for the ElasticsearchWriter writers implementation of the partial successes and failures, along with modifying existing tests elsewhere to account for the BulkWriterResponse.
    
    One question I'd like resolved in review, is where to actually place the BulkWriterResponse in the code hierarchy.  It's obviously closely tied to the BulkMessageWriter (really only existing to add functionality to it), but that lives in an interface package and BWR is a concrete class. If there are opinions on where that should live, I'm happy to move it and update the PR.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/justinleet/incubator-metron es_error_handling

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-metron/pull/286.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #286
    
----
commit 8d7475d0f8857f40ab2a025a0d20b429ef084053
Author: justinjleet <ju...@gmail.com>
Date:   2016-09-27T15:02:14Z

    Adding functionality for partial write success in Elasticsearch

commit 83161afd1b1407c47a5565340bdfa96173955e7f
Author: justinjleet <ju...@gmail.com>
Date:   2016-09-30T11:40:21Z

    fix

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81339694
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -166,14 +166,37 @@ public void write(String sensorType, WriterConfiguration configurations, Iterabl
     
         }
     
    -    BulkResponse resp = bulkRequest.execute().actionGet();
    -
    -    if (resp.hasFailures()) {
    +    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
     
    -      throw new Exception(resp.buildFailureMessage());
    +    return buildWriteReponse(tuples, bulkResponse);
    +  }
     
    +  protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, BulkResponse bulkResponse) throws Exception {
    +    // Elasticsearch responses are in the same order as the request, giving us an implicit mapping with Tuples
    +    BulkWriterResponse writerResponse = new BulkWriterResponse();
    +    if (bulkResponse.hasFailures()) {
    +      Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
    +      Iterator<Tuple> tupleIter = tuples.iterator();
    +      while (respIter.hasNext() && tupleIter.hasNext()) {
    +        BulkItemResponse item = respIter.next();
    +        Tuple tuple = tupleIter.next();
    +
    +        if (item.isFailed()) {
    +          writerResponse.addError(item.getFailure().getCause(), tuple);
    +        } else {
    +          writerResponse.addSuccess(tuple);
    +        }
    +
    +        // Should never happen, so fail the entire batch if it does.
    +        if (respIter.hasNext() != tupleIter.hasNext()) {
    +          throw new Exception(bulkResponse.buildFailureMessage());
    --- End diff --
    
    Good catch, I'll make that change.  IllegalStateException seems pretty reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81335456
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -166,14 +166,37 @@ public void write(String sensorType, WriterConfiguration configurations, Iterabl
     
         }
     
    -    BulkResponse resp = bulkRequest.execute().actionGet();
    -
    -    if (resp.hasFailures()) {
    +    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
     
    -      throw new Exception(resp.buildFailureMessage());
    +    return buildWriteReponse(tuples, bulkResponse);
    +  }
     
    +  protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, BulkResponse bulkResponse) throws Exception {
    +    // Elasticsearch responses are in the same order as the request, giving us an implicit mapping with Tuples
    +    BulkWriterResponse writerResponse = new BulkWriterResponse();
    +    if (bulkResponse.hasFailures()) {
    +      Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
    +      Iterator<Tuple> tupleIter = tuples.iterator();
    +      while (respIter.hasNext() && tupleIter.hasNext()) {
    +        BulkItemResponse item = respIter.next();
    +        Tuple tuple = tupleIter.next();
    +
    +        if (item.isFailed()) {
    +          writerResponse.addError(item.getFailure().getCause(), tuple);
    +        } else {
    +          writerResponse.addSuccess(tuple);
    +        }
    +
    +        // Should never happen, so fail the entire batch if it does.
    +        if (respIter.hasNext() != tupleIter.hasNext()) {
    +          throw new Exception(bulkResponse.buildFailureMessage());
    --- End diff --
    
    Perhaps a named exception?  `IllegalStateException`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #286: METRON-326 Error Handling in ElasticsearchWrite...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/incubator-metron/pull/286
  
    Updated with moving the files (did need to make metron-hbase depend on metron-writer), fixed the typo, and added the named exception.
    
    Have not modified the writers to catch the exceptions and pass the errors up that way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81365456
  
    --- Diff: metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.metron.elasticsearch.writer;
    +
    +import backtype.storm.tuple.Tuple;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.metron.writer.BulkWriterResponse;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.junit.Test;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.*;
    +
    +
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +public class ElasticsearchWriterTest {
    +    @Test
    +    public void testSingleSuccesses() throws Exception {
    +        Tuple tuple1 = mock(Tuple.class);
    +
    +        BulkResponse response = mock(BulkResponse.class);
    +        when(response.hasFailures()).thenReturn(false);
    +
    +        BulkWriterResponse expected = new BulkWriterResponse();
    +        expected.addSuccess(tuple1);
    +
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
    +
    +        assertEquals("Response should have no errors and single success", expected, actual);
    +    }
    +
    +    @Test
    +    public void testMultipleSuccesses() throws Exception {
    +        Tuple tuple1 = mock(Tuple.class);
    +        Tuple tuple2 = mock(Tuple.class);
    +
    +        BulkResponse response = mock(BulkResponse.class);
    +        when(response.hasFailures()).thenReturn(false);
    +
    +        BulkWriterResponse expected = new BulkWriterResponse();
    +        expected.addSuccess(tuple1);
    +        expected.addSuccess(tuple2);
    +
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
    +
    +        assertEquals("Response should have no errors and two successes", expected, actual);
    +    }
    +
    +    @Test
    +    public void testSingleFailure() throws Exception {
    +        Tuple tuple1 = mock(Tuple.class);
    +
    +        BulkResponse response = mock(BulkResponse.class);
    +        when(response.hasFailures()).thenReturn(true);
    +
    +        Exception e = new IllegalStateException();
    +        BulkItemResponse itemResponse = buildBulkItemFailure(e);
    +        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
    +
    +        BulkWriterResponse expected = new BulkWriterResponse();
    +        expected.addError(e, tuple1);
    +
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
    +
    +        assertEquals("Response should have no errors and two successes", expected, actual);
    --- End diff --
    
    I believe this assert message should read "Response should have one error and zero successes"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81339191
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java ---
    @@ -69,14 +66,19 @@ public void init(Map stormConfig, WriterConfiguration configurations) {
     
     
       @Override
    -  public void write(String sourceType
    +  public BulkWriterResponse write(String sourceType
                        , WriterConfiguration configurations
                        , Iterable<Tuple> tuples
                        , List<JSONObject> messages
                        ) throws Exception
       {
         SourceHandler handler = getSourceHandler(sourceType);
         handler.handle(messages);
    +
    +    // HDFS SourceHanlder manages writing and rotating files.  Will throw its own Exceptions.
    --- End diff --
    
    typo on `SourceHandler`.
    
    Also, if the `SourceHandler` manages writing and will throw exceptions, do we want to catch the exception, add the tuples as errors to the response and rethrow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #286: METRON-326 Error Handling in ElasticsearchWrite...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/286
  
    +1 by inspection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #286: METRON-326 Error Handling in ElasticsearchWrite...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/incubator-metron/pull/286
  
    Moving the interfaces `metron-writer` should be pretty trivial.  The only things that depend on them, unless I missed something, also depend on either `metron-writer` directly or `metron-enrichment` (which itself depends on `metron-writer`.  I'll move stuff, make sure nothing implodes and update accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81378509
  
    --- Diff: metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.metron.elasticsearch.writer;
    +
    +import backtype.storm.tuple.Tuple;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.metron.writer.BulkWriterResponse;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.junit.Test;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.*;
    +
    +
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +public class ElasticsearchWriterTest {
    +    @Test
    +    public void testSingleSuccesses() throws Exception {
    +        Tuple tuple1 = mock(Tuple.class);
    +
    +        BulkResponse response = mock(BulkResponse.class);
    +        when(response.hasFailures()).thenReturn(false);
    +
    +        BulkWriterResponse expected = new BulkWriterResponse();
    +        expected.addSuccess(tuple1);
    +
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
    +
    +        assertEquals("Response should have no errors and single success", expected, actual);
    +    }
    +
    +    @Test
    +    public void testMultipleSuccesses() throws Exception {
    +        Tuple tuple1 = mock(Tuple.class);
    +        Tuple tuple2 = mock(Tuple.class);
    +
    +        BulkResponse response = mock(BulkResponse.class);
    +        when(response.hasFailures()).thenReturn(false);
    +
    +        BulkWriterResponse expected = new BulkWriterResponse();
    +        expected.addSuccess(tuple1);
    +        expected.addSuccess(tuple2);
    +
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
    +
    +        assertEquals("Response should have no errors and two successes", expected, actual);
    +    }
    +
    +    @Test
    +    public void testSingleFailure() throws Exception {
    +        Tuple tuple1 = mock(Tuple.class);
    +
    +        BulkResponse response = mock(BulkResponse.class);
    +        when(response.hasFailures()).thenReturn(true);
    +
    +        Exception e = new IllegalStateException();
    +        BulkItemResponse itemResponse = buildBulkItemFailure(e);
    +        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
    +
    +        BulkWriterResponse expected = new BulkWriterResponse();
    +        expected.addError(e, tuple1);
    +
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
    +
    +        assertEquals("Response should have no errors and two successes", expected, actual);
    --- End diff --
    
    You are correct, will update


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81340830
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java ---
    @@ -69,14 +66,19 @@ public void init(Map stormConfig, WriterConfiguration configurations) {
     
     
       @Override
    -  public void write(String sourceType
    +  public BulkWriterResponse write(String sourceType
                        , WriterConfiguration configurations
                        , Iterable<Tuple> tuples
                        , List<JSONObject> messages
                        ) throws Exception
       {
         SourceHandler handler = getSourceHandler(sourceType);
         handler.handle(messages);
    +
    +    // HDFS SourceHanlder manages writing and rotating files.  Will throw its own Exceptions.
    --- End diff --
    
    Will fix typo.  Same deal as previous comment where it's pretty easy to change and manage that directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81338875
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java ---
    @@ -44,11 +45,15 @@ public void init(Map stormConf, WriterConfiguration config) throws Exception {
       }
     
       @Override
    -  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
         if(messages.size() > 1) {
           throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
         }
         messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
    +
    +    BulkWriterResponse response = new BulkWriterResponse();
    --- End diff --
    
    What if there's an error writing?  Do we still want to add the tuples to the response as successes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #286: METRON-326 Error Handling in ElasticsearchWrite...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/incubator-metron/pull/286
  
    Moving things using option 2 (writer package in common) seems stable, after running locally a few times.  Travis also passed.  Not sure why moving them to different module causes such issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #286: METRON-326 Error Handling in ElasticsearchWrite...

Posted by cestella <gi...@git.apache.org>.
Github user cestella commented on the issue:

    https://github.com/apache/incubator-metron/pull/286
  
    I'd vote for one of two options:
    * Investigate how if we can move those interfaces away from `metron-common` and into `metron-writer` into the `org.apache.metron.writer` package.
    * If you can't do above, I'd move `BulkMessageWriter`, `MessageWriter` and the new response object you've created into `org.apache.metron.common.writer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-metron/pull/286


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron issue #286: METRON-326 Error Handling in ElasticsearchWrite...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on the issue:

    https://github.com/apache/incubator-metron/pull/286
  
    Moving everything to `metron-writer` seems to cause random build failures (e.g. the one above), based on how things get spun up.  It is not deterministic.  I'm going to try using option 2, and try testing that and seeing if it's consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-metron pull request #286: METRON-326 Error Handling in Elasticsear...

Posted by justinleet <gi...@git.apache.org>.
Github user justinleet commented on a diff in the pull request:

    https://github.com/apache/incubator-metron/pull/286#discussion_r81340588
  
    --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java ---
    @@ -44,11 +45,15 @@ public void init(Map stormConf, WriterConfiguration config) throws Exception {
       }
     
       @Override
    -  public void write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
         if(messages.size() > 1) {
           throw new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1");
         }
         messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
    +
    +    BulkWriterResponse response = new BulkWriterResponse();
    --- End diff --
    
    Right now, the Exception handling path is slightly different from the main path, and would just treat everything as failures, regardless. The BulkWriterComponent catches exceptions and uses its complete list to mark everything as failed regardless of what the response is.  
    
    The write() method interface does note in the comments that an exception should be treated as a full batch failure, but it could pretty easily be modified to catch the exception, mark failures and kick it back up that way. I left it as-is, because that matches current behavior exactly.  At that point, I'd want to do the same thing for all the implementations (hbase, etc.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---