You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@metron.apache.org by nickwallen <gi...@git.apache.org> on 2018/11/30 13:03:27 UTC
[GitHub] metron pull request #1269: METRON-1879 Allow Elasticsearch to Auto-Generate ...
Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1269#discussion_r237852978
--- Diff: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---
@@ -56,90 +58,107 @@
*/
private transient ElasticsearchClient client;
+ /**
+ * Responsible for writing documents.
+ *
+ * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
+ * a {@link Tuple} and the document created from the contents of that tuple. If
+ * a document cannot be written, the associated tuple needs to be failed.
+ */
+ private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
+
/**
* A simple data formatter used to build the appropriate Elasticsearch index name.
*/
private SimpleDateFormat dateFormat;
-
@Override
public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
-
Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
- client = ElasticsearchClientFactory.create(globalConfiguration);
dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
+
+ // only create the document writer, if one does not already exist. useful for testing.
+ if(documentWriter == null) {
+ client = ElasticsearchClientFactory.create(globalConfiguration);
+ documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
+ }
}
@Override
- public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+ public BulkWriterResponse write(String sensorType,
+ WriterConfiguration configurations,
+ Iterable<Tuple> tuplesIter,
+ List<JSONObject> messages) {
// fetch the field name converter for this sensor type
FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
+ String indexPostfix = dateFormat.format(new Date());
+ String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
+
+ // the number of tuples must match the number of messages
+ List<Tuple> tuples = Lists.newArrayList(tuplesIter);
+ int batchSize = tuples.size();
+ if(messages.size() != batchSize) {
+ throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
+ tuples.size(), messages.size()));
+ }
- final String indexPostfix = dateFormat.format(new Date());
- BulkRequest bulkRequest = new BulkRequest();
- for(JSONObject message: messages) {
+ // create a document from each message
+ List<TupleBasedDocument> documents = new ArrayList<>();
+ for(int i=0; i<tuples.size(); i++) {
+ JSONObject message = messages.get(i);
+ Tuple tuple = tuples.get(i);
- JSONObject esDoc = new JSONObject();
+ // transform the message fields to the source fields of the indexed document
+ JSONObject source = new JSONObject();
for(Object k : message.keySet()){
- copyField(k.toString(), message, esDoc, fieldNameConverter);
+ copyField(k.toString(), message, source, fieldNameConverter);
}
- String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
- IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
- indexRequest.source(esDoc.toJSONString());
- String guid = (String)esDoc.get(Constants.GUID);
- if(guid != null) {
- indexRequest.id(guid);
+ // define the document id
+ String guid = String.class.cast(source.get(Constants.GUID));
+ if(guid == null) {
+ LOG.info("Missing '{}' field; document ID will be auto-generated.", Constants.GUID);
}
- Object ts = esDoc.get("timestamp");
- if(ts != null) {
- indexRequest.timestamp(ts.toString());
+ // define the document timestamp
+ Long timestamp = Long.class.cast(source.get(Constants.Fields.TIMESTAMP.getName()));
--- End diff --
@anandsubbu pointed out to me that this previously worked when the timestamp is a String. The code now requires a Long. I need to address this point, although I am not sure how it should be right now. I need to do a little research. Thanks @anandsubbu !
---