You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/11/30 13:03:27 UTC

[GitHub] metron pull request #1269: METRON-1879 Allow Elasticsearch to Auto-Generate ...

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1269#discussion_r237852978
  
    --- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
    @@ -56,90 +58,107 @@
        */
       private transient ElasticsearchClient client;
     
    +  /**
    +   * Responsible for writing documents.
    +   *
    +   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
    +   * a {@link Tuple} and the document created from the contents of that tuple. If
    +   * a document cannot be written, the associated tuple needs to be failed.
    +   */
    +  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
    +
       /**
        * A simple data formatter used to build the appropriate Elasticsearch index name.
        */
       private SimpleDateFormat dateFormat;
     
    -
       @Override
       public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
    -
         Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
    -    client = ElasticsearchClientFactory.create(globalConfiguration);
         dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
    +
    +    // only create the document writer, if one does not already exist. useful for testing.
    +    if(documentWriter == null) {
    +      client = ElasticsearchClientFactory.create(globalConfiguration);
    +      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
    +    }
       }
     
       @Override
    -  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
    +  public BulkWriterResponse write(String sensorType,
    +                                  WriterConfiguration configurations,
    +                                  Iterable<Tuple> tuplesIter,
    +                                  List<JSONObject> messages) {
     
         // fetch the field name converter for this sensor type
         FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
    +    String indexPostfix = dateFormat.format(new Date());
    +    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    +
    +    // the number of tuples must match the number of messages
    +    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
    +    int batchSize = tuples.size();
    +    if(messages.size() != batchSize) {
    +      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
    +              tuples.size(), messages.size()));
    +    }
     
    -    final String indexPostfix = dateFormat.format(new Date());
    -    BulkRequest bulkRequest = new BulkRequest();
    -    for(JSONObject message: messages) {
    +    // create a document from each message
    +    List<TupleBasedDocument> documents = new ArrayList<>();
    +    for(int i=0; i<tuples.size(); i++) {
    +      JSONObject message = messages.get(i);
    +      Tuple tuple = tuples.get(i);
     
    -      JSONObject esDoc = new JSONObject();
    +      // transform the message fields to the source fields of the indexed document
    +      JSONObject source = new JSONObject();
           for(Object k : message.keySet()){
    -        copyField(k.toString(), message, esDoc, fieldNameConverter);
    +        copyField(k.toString(), message, source, fieldNameConverter);
           }
     
    -      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
    -      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
    -      indexRequest.source(esDoc.toJSONString());
    -      String guid = (String)esDoc.get(Constants.GUID);
    -      if(guid != null) {
    -        indexRequest.id(guid);
    +      // define the document id
    +      String guid = String.class.cast(source.get(Constants.GUID));
    +      if(guid == null) {
    +        LOG.info("Missing '{}' field; document ID will be auto-generated.", Constants.GUID);
           }
     
    -      Object ts = esDoc.get("timestamp");
    -      if(ts != null) {
    -        indexRequest.timestamp(ts.toString());
    +      // define the document timestamp
    +      Long timestamp = Long.class.cast(source.get(Constants.Fields.TIMESTAMP.getName()));
    --- End diff --
    
    @anandsubbu pointed out to me that this previously worked when the timestamp is a String.  The code now requires a Long.  I need to address this point, although I am not sure how it should be right now.  I need to do a little research.  Thanks @anandsubbu !


---