You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/11/05 20:19:17 UTC

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

GitHub user nickwallen opened a pull request:

    https://github.com/apache/metron/pull/1254

    METRON-1849 Elasticsearch Index Write Functionality Should be Shared

    The Elasticsearch index write functionality is currently duplicated between the `ElasticsearchWriter` and the `ElasticsearchUpdateDao`.  This functionality needs to be de-duplicated and shared between the two.  This will help ensure that any changes that need to be made to how alerts are written, for instance whether the client sets a document ID, are implemented and tested in the same way across Metron.
    
    This change depends on the following PRs.
    - [ ] #1242
    - [ ] #1247
    
    ## Changes
    
    * Created the `BulkDocumentWriter` and `ElasticsearchBulkDocumentWriter` that are responsible for writing messages/alerts/documents to an Elasticsearch index.
    
    * Created the `IndexedDocument` which is what the `BulkDocumentWriter` actually writes.  This is a `Document` that contains the name of the index it is written to.  
    
        * I could not just add `index` to `Document` because the `HbaseDao` does not know how to populate this field.  When using a `MultiIndexDao`, the `Document` returned can come from any of the underlying DAOs, including the `HBaseDao`, so this field cannot always be populated with the current archtecture.
    
    * The `ElasticsearchWriter` uses a `BulkDocumentWriter` to index messages in Elasticsearch. This also uses a `TupleBasedDocument` that allows it to maintain the relationship between a tuple and the document created from a tuple.  This is needed to properly ack/fail tuples.
    
    * The scope of the `ElasticsearchWriter` tests were increased since this was now easier with the new abstractions. 
    
    * The `ElasticsearchUpdateDao` uses a `BulkDocumentWriter` to update alerts from the Alerts UI.
    
    * The `ElasticsearchRetrieveLatestDao` was setting the timestamp of all `Document`s returned to 0.  This was causing the tests that I added to fail.  I added logic to extract the timestamp from the message and set the `Document.timestamp` if one exists.
    
    
    Instead of creating the `BulkDocumentWriter` abstraction, I also looked at a couple alternatives.
    
    * Another approach I looked at was to simply have the `ElasticsearchWriter` use an `ElasticsearchUpdateDao`.  Unfortunately, this quickly led to a large number of changes. The `ElasticsearchUpdateDao` does not track and record individual failures in a batch.  It instead fails fast if any single update in a batch fails.  To track individual failures, as needed by the `ElasticsearchWriter`, would have required changes to the signature of `UpdateDao`.
    
    * Another approach was to have the `ElasticsearchUpdateDao` use an `ElasticsearchWriter`.  Unfortunately, the `ElasticsearchWriter` is a `BulkMessageWriter` and requires Storm as a dependency due to its use of `TopologyContext`.  To clean this up would have required a large number of changes.
    
    ## Testing
    
    Spin-up the Full Dev environment.  Use the Alerts UI to:
     * Search for alerts
     * Escalate alerts
     * Add comments to alerts
     * Remove comments from alerts
     * Create meta-alerts
     * Escalate meta-alerts
    
    ## Pull Request Checklist
    
    - [ ] Is there a JIRA ticket associated with this PR? If not one needs to be created at [Metron Jira](https://issues.apache.org/jira/browse/METRON/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel).
    - [ ] Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    - [ ] Have you included steps to reproduce the behavior or problem that is being changed or addressed?
    - [ ] Have you included steps or a guide to how the change may be verified and tested manually?
    - [ ] Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:
    - [ ] Have you written or updated unit tests and or integration tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
    - [ ] Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nickwallen/metron METRON-1849

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/metron/pull/1254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1254
    
----
commit a7c7dc287b4f9c99c6780b934a0b6f433a03aa04
Author: cstella <ce...@...>
Date:   2018-10-09T00:06:52Z

    Casey Stella - elasticsearch rest client migration base work

commit 10410ea9718a2a1b1d287fb4f22a6c98efb1fdaa
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-09T00:07:22Z

    Update shade plugin version

commit a33a16872118175ed35729df6ddde2959e49ae2f
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-09T15:56:08Z

    Fix es update dao test

commit 52c3c96d7657205e65d3bd0c0e35923a851911da
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-09T21:26:16Z

    Merge with master. Fix es search integration tests

commit 4742832869f9512e11cab4a32109c15a2b17a92e
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-11T18:45:06Z

    Merge branch 'master' into es-rest-client

commit 43809968320e586fd70411776140f3aa13a60195
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-11T23:59:25Z

    Get shade plugin working with the new ES client and the ClassIndexTransformer Shade plugin transformer.

commit af03f6f036e96c742db39733bf8ebc2cbf229129
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-19T02:56:52Z

    Introduce config classes for managing ES client configuration. Translate properties for new client.

commit 1c8eac2a173e1e24afc9e11163e456a9d90c93db
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-23T22:21:32Z

    Resolve merge conflicts with master

commit 1a47ded7a36f9d391227973c7da2921305373283
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-23T22:23:04Z

    Remove extra deps in metron-elasticsearch around log4j.

commit 54870d68e6f43e859367879bff7537f97c11d0bd
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-24T00:33:25Z

    Fixes for dep version issues.

commit 554de87ae160aae1ae85afb3dbb01a220a9d6838
Author: Nick Allen <ni...@...>
Date:   2018-10-25T18:54:41Z

    METRON-1845 Correct Test Data Load in Elasticsearch Integration Tests

commit 3b95d1ece278bf517642a0b0a8eee5a7ea021b9e
Author: Nick Allen <ni...@...>
Date:   2018-10-25T19:06:39Z

    Removed dead function

commit 7d9ee2563fdcc0e6b0a3d5106ea568577f21d303
Author: Nick Allen <ni...@...>
Date:   2018-10-26T18:55:03Z

    Part way through Solr changes that I am going to roll-back

commit e6993867ebd00a5c09ef1a4da772664a11d1f212
Author: Nick Allen <ni...@...>
Date:   2018-10-26T18:55:15Z

    Revert "Part way through Solr changes that I am going to roll-back"
    
    This reverts commit 7d9ee2563fdcc0e6b0a3d5106ea568577f21d303.

commit dd024b818b238e24430385914cef6ea12873bfc6
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-26T19:21:37Z

    Merge branch 'master' into es-rest-client

commit 4a7a8370e61e427ac790d2e21bc43a55ac45636b
Author: Nick Allen <ni...@...>
Date:   2018-10-26T19:29:09Z

    Resolved integration test differences with Solr

commit ebb7ef57c9fb20fb957b86516734fed1c18cda84
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-26T21:10:54Z

    Start addressing pr feedback.

commit 35811175824415c1e37418bb2efb347e6f7a801a
Author: Nick Allen <ni...@...>
Date:   2018-10-29T16:10:05Z

    METRON-1849 Added integration test for UpdateDao.update() and UpdateDao.batchUpdate()

commit b7ac957164418d733e6c8fb8aa896186bbc5176f
Author: Michael Miklavcic <mi...@...>
Date:   2018-10-30T23:30:08Z

    Addressing review feedback

commit c0faab2aaf04475ffc9e5558018b538fb62a3c7b
Author: Nick Allen <ni...@...>
Date:   2018-10-31T15:43:47Z

    METRON-1849 Progress on shared write logic

commit 021810a0edb116c3b9bcf12cb8a77bf4197f24fb
Author: Nick Allen <ni...@...>
Date:   2018-10-31T20:51:32Z

    METRON-1849 Progress on tests, still integration tests to fix-up

commit 056497034f7d0b5ebba7d8a66f1b6ff66ebfd119
Author: Nick Allen <ni...@...>
Date:   2018-11-01T13:57:06Z

    METRON-1845 Added license header

commit f1c8ce4eb5d404f2a1d99e51e3111586221e13a0
Author: Nick Allen <ni...@...>
Date:   2018-11-01T21:36:56Z

    Most tests working except for a few

commit 3e5c12169e8cea5da5349cf699b5519cdc82bca1
Author: Michael Miklavcic <mi...@...>
Date:   2018-11-02T04:56:55Z

    Refactor clientfactory. Updates to metron-common, metron-elasticsearch, and kerberos/security documentation

commit d4b02ce80a6c72cdf4595f9a47a4955f59b0a5e4
Author: Nick Allen <ni...@...>
Date:   2018-11-02T20:12:52Z

    Fixed all tests

commit fb5610b0aba13cdef7b6440e32e2c838bee3babb
Author: Nick Allen <ni...@...>
Date:   2018-11-02T20:22:09Z

    Merge remote-tracking branch 'mike/es-rest-client' into METRON-1849

commit c3b28363e87f75e97bdc9ab14d30156136080af7
Author: Nick Allen <ni...@...>
Date:   2018-11-02T20:27:42Z

    Fixed how client is created

commit 4539f43c43334696e33a861e7cbbb76a83c217b6
Author: Nick Allen <ni...@...>
Date:   2018-11-02T21:55:12Z

    Improved docs

commit 32467b375f6e6fa3ac268fc0e5f4a6a447a48f76
Author: Nick Allen <ni...@...>
Date:   2018-11-05T16:59:26Z

    Small cleanup

----


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239114286
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java ---
    @@ -0,0 +1,79 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.bulk;
    +
    +import org.apache.metron.indexing.dao.update.Document;
    +
    +import java.util.List;
    +
    +/**
    + * Writes documents to an index in bulk.
    + *
    + * <p>Partial failures within a batch can be handled individually by registering
    + * a {@link FailureListener}.
    + *
    + * @param <D> The type of document to write.
    + */
    +public interface BulkDocumentWriter<D extends Document> {
    +
    +    /**
    +     * A listener that is notified when a set of documents have been
    +     * written successfully.
    +     * @param <D> The type of document to write.
    +     */
    +    interface SuccessListener<D extends Document> {
    --- End diff --
    
    Done. See latest commit.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239058263
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -56,90 +60,111 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    --- End diff --
    
    Agreed.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239054925
  
    --- Diff: metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java ---
    @@ -18,170 +18,241 @@
     
     package org.apache.metron.elasticsearch.writer;
     
    -import static org.junit.Assert.assertEquals;
    -import static org.mockito.Mockito.mock;
    -import static org.mockito.Mockito.when;
    +import org.apache.metron.common.Constants;
    +import org.apache.metron.common.configuration.writer.WriterConfiguration;
    +import org.apache.metron.common.writer.BulkWriterResponse;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.tuple.Tuple;
    +import org.json.simple.JSONObject;
    +import org.junit.Before;
    +import org.junit.Test;
     
    -import com.google.common.collect.ImmutableList;
    +import java.util.ArrayList;
     import java.util.Collection;
     import java.util.HashMap;
    +import java.util.List;
     import java.util.Map;
    -import org.apache.metron.common.writer.BulkWriterResponse;
    -import org.apache.storm.tuple.Tuple;
    -import org.elasticsearch.action.bulk.BulkItemResponse;
    -import org.elasticsearch.action.bulk.BulkResponse;
    -import org.junit.Test;
    +import java.util.UUID;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
     
     public class ElasticsearchWriterTest {
    -    @Test
    -    public void testSingleSuccesses() throws Exception {
    -        Tuple tuple1 = mock(Tuple.class);
     
    -        BulkResponse response = mock(BulkResponse.class);
    -        when(response.hasFailures()).thenReturn(false);
    +    Map stormConf;
    +    TopologyContext topologyContext;
    +    WriterConfiguration writerConfiguration;
     
    -        BulkWriterResponse expected = new BulkWriterResponse();
    -        expected.addSuccess(tuple1);
    +    @Before
    +    public void setup() {
    +        topologyContext = mock(TopologyContext.class);
     
    -        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    -        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
    +        writerConfiguration = mock(WriterConfiguration.class);
    +        when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
     
    -        assertEquals("Response should have no errors and single success", expected, actual);
    +        stormConf = new HashMap();
         }
     
         @Test
    -    public void testMultipleSuccesses() throws Exception {
    -        Tuple tuple1 = mock(Tuple.class);
    -        Tuple tuple2 = mock(Tuple.class);
    -
    -        BulkResponse response = mock(BulkResponse.class);
    -        when(response.hasFailures()).thenReturn(false);
    +    public void shouldWriteSuccessfully() {
    +        // create a writer where all writes will be successful
    +        float probabilityOfSuccess = 1.0F;
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        esWriter.setDocumentWriter( new BulkDocumentWriterStub<>(probabilityOfSuccess));
    +        esWriter.init(stormConf, topologyContext, writerConfiguration);
     
    -        BulkWriterResponse expected = new BulkWriterResponse();
    -        expected.addSuccess(tuple1);
    -        expected.addSuccess(tuple2);
    +        // create a tuple and a message associated with that tuple
    +        List<Tuple> tuples = createTuples(1);
    +        List<JSONObject> messages = createMessages(1);
     
    -        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    -        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
    +        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
     
    -        assertEquals("Response should have no errors and two successes", expected, actual);
    +        // response should only contain successes
    +        assertFalse(response.hasErrors());
    +        assertTrue(response.getSuccesses().contains(tuples.get(0)));
         }
     
         @Test
    -    public void testSingleFailure() throws Exception {
    -        Tuple tuple1 = mock(Tuple.class);
    -
    -        BulkResponse response = mock(BulkResponse.class);
    -        when(response.hasFailures()).thenReturn(true);
    -
    -        Exception e = new IllegalStateException();
    -        BulkItemResponse itemResponse = buildBulkItemFailure(e);
    -        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
    +    public void shouldWriteManySuccessfully() {
    +        // create a writer where all writes will be successful
    +        float probabilityOfSuccess = 1.0F;
    --- End diff --
    
    Right.  Just needed a way to test successes, failures, and partial successes.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r238881060
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -56,90 +60,111 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    --- End diff --
    
    Should be using https://github.com/apache/metron/blob/master/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java#L39 for conversions.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r236738173
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.bulk;
    +
    +import org.apache.commons.lang3.exception.ExceptionUtils;
    +import org.apache.metron.elasticsearch.client.ElasticsearchClient;
    +import org.apache.metron.indexing.dao.update.Document;
    +import org.elasticsearch.action.DocWriteRequest;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequest;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.index.IndexRequest;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.lang.invoke.MethodHandles;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Writes documents to an Elasticsearch index in bulk.
    + *
    + * @param <D> The type of document to write.
    + */
    +public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> {
    +
    +    /**
    +     * A {@link Document} along with the index it will be written to.
    +     */
    +    private class Indexable {
    +        D document;
    +        String index;
    +
    +        public Indexable(D document, String index) {
    +            this.document = document;
    +            this.index = index;
    +        }
    +    }
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    +    private Optional<SuccessListener> onSuccess;
    +    private Optional<FailureListener> onFailure;
    +    private ElasticsearchClient client;
    +    private List<Indexable> documents;
    +
    +    public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
    +        this.client = client;
    +        this.onSuccess = Optional.empty();
    +        this.onFailure = Optional.empty();
    +        this.documents = new ArrayList<>();
    +    }
    +
    +    @Override
    +    public void onSuccess(SuccessListener<D> onSuccess) {
    +        this.onSuccess = Optional.of(onSuccess);
    +    }
    +
    +    @Override
    +    public void onFailure(FailureListener<D> onFailure) {
    +        this.onFailure = Optional.of(onFailure);
    +    }
    +
    +    @Override
    +    public void addDocument(D document, String index) {
    +        documents.add(new Indexable(document, index));
    +        LOG.debug("Adding document to batch; document={}, index={}", document, index);
    +    }
    +
    +    @Override
    +    public void write() {
    +        try {
    +            // create an index request for each document
    +            List<DocWriteRequest> requests = documents
    --- End diff --
    
    I usually think of us as IO bound, rather than compute, in Indexing.  But I don't mind refactoring out the use of streams, just to avoid any potential problems.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r238876546
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java ---
    @@ -0,0 +1,79 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.bulk;
    +
    +import org.apache.metron.indexing.dao.update.Document;
    +
    +import java.util.List;
    +
    +/**
    + * Writes documents to an index in bulk.
    + *
    + * <p>Partial failures within a batch can be handled individually by registering
    + * a {@link FailureListener}.
    + *
    + * @param <D> The type of document to write.
    + */
    +public interface BulkDocumentWriter<D extends Document> {
    +
    +    /**
    +     * A listener that is notified when a set of documents have been
    +     * written successfully.
    +     * @param <D> The type of document to write.
    +     */
    +    interface SuccessListener<D extends Document> {
    --- End diff --
    
    How's this compare with the recent changes for decoupling the Parser Bolts from parser exec? At a glance, this seems very similar from an abstraction standpoint. Any reason we would want callbacks here as compared with the decisions we made for the parser bolt being more direct?
    
    https://github.com/apache/metron/pull/1213#issuecomment-427086826


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239065568
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---
    @@ -196,7 +196,7 @@ public ElasticsearchDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPoli
       }
     
       protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
    -    return updateDao.getIndexName(guid, sensorType);
    +    return updateDao.findIndexNameByGUID(guid, sensorType);
    --- End diff --
    
    > Also, would we want any parity between the updateDao's find method name vs the ElasticsearchDao's getIndexName method name?
    
    I found the [code here confusing](https://github.com/apache/metron/blob/89a2beda4f07911c8b3cd7dee8a2c3426838d161/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java#L195-L197) and had me stuck on an issue for quite some time.  When both use `getIndexName` I have no idea what the logic is doing.  It tries one approach, then falls back to another, but since the methods are named the same, it doesn't tell me how they attempt to find the index name in a different way.
    
    With the rename, I feel it improves understanding in a glance [what this is doing now](https://github.com/apache/metron/blob/260ccc366b79ef53595dbfd097066040444b4eda/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java#L179) and the differences between the primary approach versus the fallback.
    
    
    > Is sensorType not a component to retrieving the index name? 
    
    So you prefer the original function name?  Or you prefer `lookupIndexName`, `findIndexNameByGUIDAndSensor`?



---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r238876879
  
    --- Diff: metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java ---
    @@ -89,46 +91,29 @@ public void setGuid(String guid) {
         this.guid = guid;
       }
     
    -  @Override
    -  public String toString() {
    -    return "Document{" +
    -        "timestamp=" + timestamp +
    -        ", document=" + document +
    -        ", guid='" + guid + '\'' +
    -        ", sensorType='" + sensorType + '\'' +
    -        '}';
    -  }
    -
       @Override
       public boolean equals(Object o) {
    -    if (this == o) {
    -      return true;
    -    }
    -    if (o == null || getClass() != o.getClass()) {
    -      return false;
    -    }
    -
    +    if (this == o) return true;
    +    if (!(o instanceof Document)) return false;
         Document document1 = (Document) o;
    -
    -    if (timestamp != null ? !timestamp.equals(document1.timestamp) : document1.timestamp != null) {
    -      return false;
    -    }
    -    if (document != null ? !document.equals(document1.document) : document1.document != null) {
    -      return false;
    -    }
    -    if (guid != null ? !guid.equals(document1.guid) : document1.guid != null) {
    -      return false;
    -    }
    -    return sensorType != null ? sensorType.equals(document1.sensorType)
    -        : document1.sensorType == null;
    +    return Objects.equals(timestamp, document1.timestamp) &&
    --- End diff --
    
    Is this change to equals and hashcode an auto-create from IntelliJ?


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r236844166
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -56,90 +58,107 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    +      if(guid == null) {
    +        LOG.info("Missing '{}' field; document ID will be auto-generated.", Constants.GUID);
           }
     
    -      Object ts = esDoc.get("timestamp");
    -      if(ts != null) {
    -        indexRequest.timestamp(ts.toString());
    +      // define the document timestamp
    +      Long timestamp = Long.class.cast(source.get(Constants.Fields.TIMESTAMP.getName()));
    +      if(timestamp == null) {
    +        LOG.info("Missing '{}' field; timestamp will be set to system time.", Constants.Fields.TIMESTAMP.getName());
           }
    -      bulkRequest.add(indexRequest);
    +
    +      TupleBasedDocument document = new TupleBasedDocument(source, guid, sensorType, timestamp, tuple);
    +      documentWriter.addDocument(document, indexName);
         }
     
    -    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
    -    return buildWriteReponse(tuples, bulkResponse);
    +    // add successful tuples to the response
    +    BulkWriterResponse response = new BulkWriterResponse();
    +    documentWriter.onSuccess(docs -> {
    +      List<Tuple> successfulTuples = docs.stream().map(doc -> doc.getTuple()).collect(Collectors.toList());
    --- End diff --
    
    Done too. Thanks.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r238882021
  
    --- Diff: metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java ---
    @@ -18,170 +18,241 @@
     
     package org.apache.metron.elasticsearch.writer;
     
    -import static org.junit.Assert.assertEquals;
    -import static org.mockito.Mockito.mock;
    -import static org.mockito.Mockito.when;
    +import org.apache.metron.common.Constants;
    +import org.apache.metron.common.configuration.writer.WriterConfiguration;
    +import org.apache.metron.common.writer.BulkWriterResponse;
    +import org.apache.storm.task.TopologyContext;
    +import org.apache.storm.tuple.Tuple;
    +import org.json.simple.JSONObject;
    +import org.junit.Before;
    +import org.junit.Test;
     
    -import com.google.common.collect.ImmutableList;
    +import java.util.ArrayList;
     import java.util.Collection;
     import java.util.HashMap;
    +import java.util.List;
     import java.util.Map;
    -import org.apache.metron.common.writer.BulkWriterResponse;
    -import org.apache.storm.tuple.Tuple;
    -import org.elasticsearch.action.bulk.BulkItemResponse;
    -import org.elasticsearch.action.bulk.BulkResponse;
    -import org.junit.Test;
    +import java.util.UUID;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
     
     public class ElasticsearchWriterTest {
    -    @Test
    -    public void testSingleSuccesses() throws Exception {
    -        Tuple tuple1 = mock(Tuple.class);
     
    -        BulkResponse response = mock(BulkResponse.class);
    -        when(response.hasFailures()).thenReturn(false);
    +    Map stormConf;
    +    TopologyContext topologyContext;
    +    WriterConfiguration writerConfiguration;
     
    -        BulkWriterResponse expected = new BulkWriterResponse();
    -        expected.addSuccess(tuple1);
    +    @Before
    +    public void setup() {
    +        topologyContext = mock(TopologyContext.class);
     
    -        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    -        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
    +        writerConfiguration = mock(WriterConfiguration.class);
    +        when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
     
    -        assertEquals("Response should have no errors and single success", expected, actual);
    +        stormConf = new HashMap();
         }
     
         @Test
    -    public void testMultipleSuccesses() throws Exception {
    -        Tuple tuple1 = mock(Tuple.class);
    -        Tuple tuple2 = mock(Tuple.class);
    -
    -        BulkResponse response = mock(BulkResponse.class);
    -        when(response.hasFailures()).thenReturn(false);
    +    public void shouldWriteSuccessfully() {
    +        // create a writer where all writes will be successful
    +        float probabilityOfSuccess = 1.0F;
    +        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    +        esWriter.setDocumentWriter( new BulkDocumentWriterStub<>(probabilityOfSuccess));
    +        esWriter.init(stormConf, topologyContext, writerConfiguration);
     
    -        BulkWriterResponse expected = new BulkWriterResponse();
    -        expected.addSuccess(tuple1);
    -        expected.addSuccess(tuple2);
    +        // create a tuple and a message associated with that tuple
    +        List<Tuple> tuples = createTuples(1);
    +        List<JSONObject> messages = createMessages(1);
     
    -        ElasticsearchWriter esWriter = new ElasticsearchWriter();
    -        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
    +        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
     
    -        assertEquals("Response should have no errors and two successes", expected, actual);
    +        // response should only contain successes
    +        assertFalse(response.hasErrors());
    +        assertTrue(response.getSuccesses().contains(tuples.get(0)));
         }
     
         @Test
    -    public void testSingleFailure() throws Exception {
    -        Tuple tuple1 = mock(Tuple.class);
    -
    -        BulkResponse response = mock(BulkResponse.class);
    -        when(response.hasFailures()).thenReturn(true);
    -
    -        Exception e = new IllegalStateException();
    -        BulkItemResponse itemResponse = buildBulkItemFailure(e);
    -        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
    +    public void shouldWriteManySuccessfully() {
    +        // create a writer where all writes will be successful
    +        float probabilityOfSuccess = 1.0F;
    --- End diff --
    
    Hm, interesting - so the stub will definitively fail some percentage of tuples for all POS vals < 1.0F?


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r236530989
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -56,90 +58,107 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    +      if(guid == null) {
    +        LOG.info("Missing '{}' field; document ID will be auto-generated.", Constants.GUID);
           }
     
    -      Object ts = esDoc.get("timestamp");
    -      if(ts != null) {
    -        indexRequest.timestamp(ts.toString());
    +      // define the document timestamp
    +      Long timestamp = Long.class.cast(source.get(Constants.Fields.TIMESTAMP.getName()));
    +      if(timestamp == null) {
    +        LOG.info("Missing '{}' field; timestamp will be set to system time.", Constants.Fields.TIMESTAMP.getName());
           }
    -      bulkRequest.add(indexRequest);
    +
    +      TupleBasedDocument document = new TupleBasedDocument(source, guid, sensorType, timestamp, tuple);
    +      documentWriter.addDocument(document, indexName);
         }
     
    -    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
    -    return buildWriteReponse(tuples, bulkResponse);
    +    // add successful tuples to the response
    +    BulkWriterResponse response = new BulkWriterResponse();
    +    documentWriter.onSuccess(docs -> {
    +      List<Tuple> successfulTuples = docs.stream().map(doc -> doc.getTuple()).collect(Collectors.toList());
    --- End diff --
    
    Same comment on streams as above.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r236843942
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.bulk;
    +
    +import org.apache.commons.lang3.exception.ExceptionUtils;
    +import org.apache.metron.elasticsearch.client.ElasticsearchClient;
    +import org.apache.metron.indexing.dao.update.Document;
    +import org.elasticsearch.action.DocWriteRequest;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequest;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.index.IndexRequest;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.lang.invoke.MethodHandles;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Writes documents to an Elasticsearch index in bulk.
    + *
    + * @param <D> The type of document to write.
    + */
    +public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> {
    +
    +    /**
    +     * A {@link Document} along with the index it will be written to.
    +     */
    +    private class Indexable {
    +        D document;
    +        String index;
    +
    +        public Indexable(D document, String index) {
    +            this.document = document;
    +            this.index = index;
    +        }
    +    }
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    +    private Optional<SuccessListener> onSuccess;
    +    private Optional<FailureListener> onFailure;
    +    private ElasticsearchClient client;
    +    private List<Indexable> documents;
    +
    +    public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
    +        this.client = client;
    +        this.onSuccess = Optional.empty();
    +        this.onFailure = Optional.empty();
    +        this.documents = new ArrayList<>();
    +    }
    +
    +    @Override
    +    public void onSuccess(SuccessListener<D> onSuccess) {
    +        this.onSuccess = Optional.of(onSuccess);
    +    }
    +
    +    @Override
    +    public void onFailure(FailureListener<D> onFailure) {
    +        this.onFailure = Optional.of(onFailure);
    +    }
    +
    +    @Override
    +    public void addDocument(D document, String index) {
    +        documents.add(new Indexable(document, index));
    +        LOG.debug("Adding document to batch; document={}, index={}", document, index);
    +    }
    +
    +    @Override
    +    public void write() {
    +        try {
    +            // create an index request for each document
    +            List<DocWriteRequest> requests = documents
    --- End diff --
    
    It turned out to be much cleaner to get rid of the streams.  So double-win.  Thanks.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239054070
  
    --- Diff: metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/Document.java ---
    @@ -89,46 +91,29 @@ public void setGuid(String guid) {
         this.guid = guid;
       }
     
    -  @Override
    -  public String toString() {
    -    return "Document{" +
    -        "timestamp=" + timestamp +
    -        ", document=" + document +
    -        ", guid='" + guid + '\'' +
    -        ", sensorType='" + sensorType + '\'' +
    -        '}';
    -  }
    -
       @Override
       public boolean equals(Object o) {
    -    if (this == o) {
    -      return true;
    -    }
    -    if (o == null || getClass() != o.getClass()) {
    -      return false;
    -    }
    -
    +    if (this == o) return true;
    +    if (!(o instanceof Document)) return false;
         Document document1 = (Document) o;
    -
    -    if (timestamp != null ? !timestamp.equals(document1.timestamp) : document1.timestamp != null) {
    -      return false;
    -    }
    -    if (document != null ? !document.equals(document1.document) : document1.document != null) {
    -      return false;
    -    }
    -    if (guid != null ? !guid.equals(document1.guid) : document1.guid != null) {
    -      return false;
    -    }
    -    return sensorType != null ? sensorType.equals(document1.sensorType)
    -        : document1.sensorType == null;
    +    return Objects.equals(timestamp, document1.timestamp) &&
    --- End diff --
    
    It is an auto-create from IntelliJ.  It changed when on a previous iteration, I added a field to the Document class.  I since backed that out and went with another approach.  So there really is no need for this to change now.  I will back this out.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r236530583
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.bulk;
    +
    +import org.apache.commons.lang3.exception.ExceptionUtils;
    +import org.apache.metron.elasticsearch.client.ElasticsearchClient;
    +import org.apache.metron.indexing.dao.update.Document;
    +import org.elasticsearch.action.DocWriteRequest;
    +import org.elasticsearch.action.bulk.BulkItemResponse;
    +import org.elasticsearch.action.bulk.BulkRequest;
    +import org.elasticsearch.action.bulk.BulkResponse;
    +import org.elasticsearch.action.index.IndexRequest;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.lang.invoke.MethodHandles;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.stream.Collectors;
    +
    +/**
    + * Writes documents to an Elasticsearch index in bulk.
    + *
    + * @param <D> The type of document to write.
    + */
    +public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> {
    +
    +    /**
    +     * A {@link Document} along with the index it will be written to.
    +     */
    +    private class Indexable {
    +        D document;
    +        String index;
    +
    +        public Indexable(D document, String index) {
    +            this.document = document;
    +            this.index = index;
    +        }
    +    }
    +
    +    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    +    private Optional<SuccessListener> onSuccess;
    +    private Optional<FailureListener> onFailure;
    +    private ElasticsearchClient client;
    +    private List<Indexable> documents;
    +
    +    public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
    +        this.client = client;
    +        this.onSuccess = Optional.empty();
    +        this.onFailure = Optional.empty();
    +        this.documents = new ArrayList<>();
    +    }
    +
    +    @Override
    +    public void onSuccess(SuccessListener<D> onSuccess) {
    +        this.onSuccess = Optional.of(onSuccess);
    +    }
    +
    +    @Override
    +    public void onFailure(FailureListener<D> onFailure) {
    +        this.onFailure = Optional.of(onFailure);
    +    }
    +
    +    @Override
    +    public void addDocument(D document, String index) {
    +        documents.add(new Indexable(document, index));
    +        LOG.debug("Adding document to batch; document={}, index={}", document, index);
    +    }
    +
    +    @Override
    +    public void write() {
    +        try {
    +            // create an index request for each document
    +            List<DocWriteRequest> requests = documents
    --- End diff --
    
    Please be careful using streams. In general, but specifically here. Our ES endpoints are typically where we bottleneck in any tuning exercise and adding additional overhead may prove troublesome. I'd normally not stress too much on "premature optimization" but I think we're safely very much post-premature at this stage of the game with our Lucene stores.
    
    https://jaxenter.com/java-performance-tutorial-how-fast-are-the-java-8-streams-118830.html


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by nickwallen <gi...@git.apache.org>.
Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r239087246
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java ---
    @@ -0,0 +1,79 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.metron.elasticsearch.bulk;
    +
    +import org.apache.metron.indexing.dao.update.Document;
    +
    +import java.util.List;
    +
    +/**
    + * Writes documents to an index in bulk.
    + *
    + * <p>Partial failures within a batch can be handled individually by registering
    + * a {@link FailureListener}.
    + *
    + * @param <D> The type of document to write.
    + */
    +public interface BulkDocumentWriter<D extends Document> {
    +
    +    /**
    +     * A listener that is notified when a set of documents have been
    +     * written successfully.
    +     * @param <D> The type of document to write.
    +     */
    +    interface SuccessListener<D extends Document> {
    --- End diff --
    
    Yep, I think you're right.  Wish I would have noticed this before. Daft.


---

[GitHub] metron pull request #1254: METRON-1849 Elasticsearch Index Write Functionali...

Posted by mmiklavc <gi...@git.apache.org>.
Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1254#discussion_r238883436
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---
    @@ -196,7 +196,7 @@ public ElasticsearchDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPoli
       }
     
       protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
    -    return updateDao.getIndexName(guid, sensorType);
    +    return updateDao.findIndexNameByGUID(guid, sensorType);
    --- End diff --
    
    Is sensorType not a component to retrieving the index name? Also, would we want any parity between the updateDao's find method name vs the ElasticsearchDao's getIndexName method name?


---