You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/12/11 20:24:25 UTC
[metron] branch master updated: METRON-1849 Elasticsearch Index
Write Functionality Should be Shared (nickwallen) closes apache/metron#1254
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new ec3b98f METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254
ec3b98f is described below
commit ec3b98f762ce3726ed9a33abdb446957d1865dca
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Tue Dec 11 12:59:08 2018 -0500
METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254
---
.../elasticsearch/bulk/BulkDocumentWriter.java | 45 +++
.../bulk/BulkDocumentWriterResults.java | 68 ++++
.../bulk/ElasticsearchBulkDocumentWriter.java | 166 ++++++++++
.../metron/elasticsearch/bulk/WriteFailure.java | 48 +++
.../metron/elasticsearch/bulk/WriteSuccess.java | 36 +++
.../metron/elasticsearch/dao/ElasticsearchDao.java | 2 +-
.../elasticsearch/dao/ElasticsearchUpdateDao.java | 144 ++++-----
.../elasticsearch/writer/ElasticsearchWriter.java | 157 +++++----
.../elasticsearch/writer/TupleBasedDocument.java | 44 +++
.../bulk/ElasticsearchBulkDocumentWriterTest.java | 178 ++++++++++
.../components/ElasticSearchComponent.java | 46 +--
.../writer/ElasticsearchWriterTest.java | 360 ++++++++++++++-------
.../src/test/resources/log4j.properties | 0
13 files changed, 994 insertions(+), 300 deletions(-)
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
new file mode 100644
index 0000000..34f543e
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Writes documents to an index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public interface BulkDocumentWriter<D extends Document> {
+
+ /**
+ * Add a document to the batch.
+ * @param document The document to write.
+ * @param index The name of the index to write to.
+ */
+ void addDocument(D document, String index);
+
+ /**
+ * @return The number of documents waiting to be written.
+ */
+ int size();
+
+ /**
+ * Write all documents in the batch.
+ */
+ BulkDocumentWriterResults<D> write();
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
new file mode 100644
index 0000000..90e5ce3
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The result of writing documents in bulk using a {@link BulkDocumentWriter}.
+ * @param <D> The type of documents to write.
+ */
+public class BulkDocumentWriterResults<D extends Document> {
+
+ private List<WriteSuccess<D>> successes;
+ private List<WriteFailure<D>> failures;
+
+ public BulkDocumentWriterResults() {
+ this.successes = new ArrayList<>();
+ this.failures = new ArrayList<>();
+ }
+
+ public void add(WriteSuccess<D> success) {
+ this.successes.add(success);
+ }
+
+ public void addSuccess(D success) {
+ add(new WriteSuccess<D>(success));
+ }
+
+ public void addSuccesses(List<D> successes) {
+ for(D success: successes) {
+ addSuccess(success);
+ }
+ }
+
+ public List<WriteSuccess<D>> getSuccesses() {
+ return successes;
+ }
+
+ public void add(WriteFailure<D> failure) {
+ this.failures.add(failure);
+ }
+
+ public void addFailure(D document, Throwable cause, String message) {
+ add(new WriteFailure(document, cause, message));
+ }
+
+ public List<WriteFailure<D>> getFailures() {
+ return failures;
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
new file mode 100644
index 0000000..9e6e568
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Writes documents to an Elasticsearch index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> {
+
+ /**
+ * A {@link Document} along with the index it will be written to.
+ */
+ private class Indexable {
+ D document;
+ String index;
+
+ public Indexable(D document, String index) {
+ this.document = document;
+ this.index = index;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private ElasticsearchClient client;
+ private List<Indexable> documents;
+ private WriteRequest.RefreshPolicy refreshPolicy;
+
+ public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
+ this.client = client;
+ this.documents = new ArrayList<>();
+ this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+ }
+
+ @Override
+ public void addDocument(D document, String indexName) {
+ documents.add(new Indexable(document, indexName));
+ LOG.debug("Adding document to batch; document={}, index={}", document, indexName);
+ }
+
+ @Override
+ public BulkDocumentWriterResults<D> write() {
+ BulkDocumentWriterResults<D> results = new BulkDocumentWriterResults<>();
+ try {
+ // create an index request for each document
+ BulkRequest bulkRequest = new BulkRequest();
+ bulkRequest.setRefreshPolicy(refreshPolicy);
+ for(Indexable doc: documents) {
+ DocWriteRequest request = createRequest(doc.document, doc.index);
+ bulkRequest.add(request);
+ }
+
+ // submit the request and handle the response
+ BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
+ handleBulkResponse(bulkResponse, documents, results);
+
+ } catch(IOException e) {
+ // assume all documents have failed
+ for(Indexable indexable: documents) {
+ D failed = indexable.document;
+ results.addFailure(failed, e, ExceptionUtils.getRootCauseMessage(e));
+ }
+ LOG.error("Failed to submit bulk request; all documents failed", e);
+
+ } finally {
+ // flush all documents no matter which ones succeeded or failed
+ documents.clear();
+ }
+
+ LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, success={}, failed={}",
+ documents.size(), results.getSuccesses().size(), results.getFailures().size());
+ return results;
+ }
+
+ @Override
+ public int size() {
+ return documents.size();
+ }
+
+ public ElasticsearchBulkDocumentWriter<D> withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
+ this.refreshPolicy = refreshPolicy;
+ return this;
+ }
+
+ private IndexRequest createRequest(D document, String index) {
+ if(document.getTimestamp() == null) {
+ throw new IllegalArgumentException("Document must contain the timestamp");
+ }
+ return new IndexRequest()
+ .source(document.getDocument())
+ .type(document.getSensorType() + "_doc")
+ .id(document.getGuid())
+ .index(index)
+ .timestamp(document.getTimestamp().toString());
+ }
+
+ /**
+ * Handles the {@link BulkResponse} received from Elasticsearch.
+ * @param bulkResponse The response received from Elasticsearch.
+ * @param documents The documents included in the bulk request.
+ * @param results The writer results.
+ */
+ private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> documents, BulkDocumentWriterResults<D> results) {
+ if (bulkResponse.hasFailures()) {
+
+ // interrogate the response to distinguish between those that succeeded and those that failed
+ for(BulkItemResponse response: bulkResponse) {
+ if(response.isFailed()) {
+ // request failed
+ D failed = getDocument(response.getItemId());
+ Exception cause = response.getFailure().getCause();
+ String message = response.getFailureMessage();
+ results.addFailure(failed, cause, message);
+
+ } else {
+ // request succeeded
+ D success = getDocument(response.getItemId());
+ results.addSuccess(success);
+ }
+ }
+ } else {
+ // all requests succeeded
+ for(Indexable success: documents) {
+ results.addSuccess(success.document);
+ }
+ }
+ }
+
+ private D getDocument(int index) {
+ return documents.get(index).document;
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
new file mode 100644
index 0000000..ac571c7
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Indicates that a document failed to be written by a {@link BulkDocumentWriter}.
+ * @param <D> The type of document that failed to write.
+ */
+public class WriteFailure <D extends Document> {
+ private D document;
+ private Throwable cause;
+ private String message;
+
+ public WriteFailure(D document, Throwable cause, String message) {
+ this.document = document;
+ this.cause = cause;
+ this.message = message;
+ }
+
+ public D getDocument() {
+ return document;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
new file mode 100644
index 0000000..a86325d
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Indicates that a document was successfully written by a {@link BulkDocumentWriter}.
+ * @param <D> The type of document written.
+ */
+public class WriteSuccess <D extends Document> {
+ private D document;
+
+ public WriteSuccess(D document) {
+ this.document = document;
+ }
+
+ public D getDocument() {
+ return document;
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 675d22f..7226c30 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -191,7 +191,7 @@ public class ElasticsearchDao implements IndexDao {
}
protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
- return updateDao.getIndexName(guid, sensorType);
+ return updateDao.findIndexNameByGUID(guid, sensorType);
}
protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder)
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index ba852aa..fa02f8d 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -17,18 +17,9 @@
*/
package org.apache.metron.elasticsearch.dao;
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
@@ -36,84 +27,80 @@ import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+import static java.lang.String.format;
+
public class ElasticsearchUpdateDao implements UpdateDao {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private transient ElasticsearchClient client;
private AccessConfig accessConfig;
private ElasticsearchRetrieveLatestDao retrieveLatestDao;
- private WriteRequest.RefreshPolicy refreshPolicy;
+ private ElasticsearchBulkDocumentWriter<Document> documentWriter;
public ElasticsearchUpdateDao(ElasticsearchClient client,
AccessConfig accessConfig,
ElasticsearchRetrieveLatestDao searchDao) {
- this.client = client;
this.accessConfig = accessConfig;
this.retrieveLatestDao = searchDao;
- this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+ this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client)
+ .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
}
@Override
public Document update(Document update, Optional<String> index) throws IOException {
- String indexPostfix = ElasticsearchUtils
- .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
- String sensorType = update.getSensorType();
- String indexName = getIndexName(update, index, indexPostfix);
-
- IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
- try {
- IndexResponse response = client.getHighLevelClient().index(indexRequest);
-
- ShardInfo shardInfo = response.getShardInfo();
- int failed = shardInfo.getFailed();
- if (failed > 0) {
- throw new IOException(
- "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
- }
- } catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
- return update;
+ Map<Document, Optional<String>> updates = new HashMap<>();
+ updates.put(update, index);
+
+ Map<Document, Optional<String>> results = batchUpdate(updates);
+ return results.keySet().iterator().next();
}
@Override
public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
- String indexPostfix = ElasticsearchUtils
- .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-
- BulkRequest bulkRequestBuilder = new BulkRequest();
- bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
-
- // Get the indices we'll actually be using for each Document.
- for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
- Document update = updateEntry.getKey();
- String sensorType = update.getSensorType();
- String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
- IndexRequest indexRequest = buildIndexRequest(
- update,
- sensorType,
- indexName
- );
-
- bulkRequestBuilder.add(indexRequest);
+ Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get();
+ String indexPostfix = ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date());
+
+ for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) {
+ Document document = entry.getKey();
+ Optional<String> optionalIndex = entry.getValue();
+ String indexName = optionalIndex.orElse(getIndexName(document, indexPostfix));
+ documentWriter.addDocument(document, indexName);
}
- BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder);
- if (bulkResponse.hasFailures()) {
- LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage());
- throw new IOException(
- "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage());
+ // write the documents. if any document fails, raise an exception.
+ BulkDocumentWriterResults<Document> results = documentWriter.write();
+ int failures = results.getFailures().size();
+ if(failures > 0) {
+ int successes = results.getSuccesses().size();
+ String msg = format("Failed to update all documents; %d successes, %d failures", successes, failures);
+ LOG.error(msg);
+
+ // log each individual failure
+ for(WriteFailure<Document> failure: results.getFailures()) {
+ LOG.error(failure.getMessage(), failure.getCause());
+ }
+
+ // raise an exception using the first exception as the root cause, although there may be many
+ Throwable cause = results.getFailures().get(0).getCause();
+ throw new IOException(msg, cause);
}
+
return updates;
}
@@ -187,32 +174,19 @@ public class ElasticsearchUpdateDao implements UpdateDao {
}
public ElasticsearchUpdateDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
- this.refreshPolicy = refreshPolicy;
+ documentWriter.withRefreshPolicy(refreshPolicy);
return this;
}
- protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException {
- return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
- .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
- );
+ protected String getIndexName(Document update, String indexPostFix) throws IOException {
+ return findIndexNameByGUID(update.getGuid(), update.getSensorType())
+ .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null));
}
- protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
- return retrieveLatestDao.searchByGuid(guid,
- sensorType,
- hit -> Optional.ofNullable(hit.getIndex())
- );
- }
-
- protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) {
- String type = sensorType + "_doc";
- Object ts = update.getTimestamp();
- IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
- .source(update.getDocument());
- if (ts != null) {
- indexRequest = indexRequest.timestamp(ts.toString());
- }
-
- return indexRequest;
+ protected Optional<String> findIndexNameByGUID(String guid, String sensorType) throws IOException {
+ return retrieveLatestDao.searchByGuid(
+ guid,
+ sensorType,
+ hit -> Optional.ofNullable(hit.getIndex()));
}
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index fbdd4fe..a3459d8 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,21 +17,24 @@
*/
package org.apache.metron.elasticsearch.writer;
+import com.google.common.collect.Lists;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.apache.metron.common.field.FieldNameConverter;
import org.apache.metron.common.field.FieldNameConverters;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.WriteSuccess;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,11 +42,14 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.lang.invoke.MethodHandles;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.Date;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static java.lang.String.format;
+import static org.apache.metron.stellar.common.Constants.Fields.TIMESTAMP;
+
/**
* A {@link BulkMessageWriter} that writes messages to Elasticsearch.
*/
@@ -57,89 +63,110 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
private transient ElasticsearchClient client;
/**
+ * Responsible for writing documents.
+ *
+ * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
+ * a {@link Tuple} and the document created from the contents of that tuple. If
+ * a document cannot be written, the associated tuple needs to be failed.
+ */
+ private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
+
+ /**
* A simple data formatter used to build the appropriate Elasticsearch index name.
*/
private SimpleDateFormat dateFormat;
-
@Override
public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
-
Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
- client = ElasticsearchClientFactory.create(globalConfiguration);
dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
+
+ // only create the document writer, if one does not already exist. useful for testing.
+ if(documentWriter == null) {
+ client = ElasticsearchClientFactory.create(globalConfiguration);
+ documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
+ }
}
@Override
- public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+ public BulkWriterResponse write(String sensorType,
+ WriterConfiguration configurations,
+ Iterable<Tuple> tuplesIter,
+ List<JSONObject> messages) {
// fetch the field name converter for this sensor type
FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
+ String indexPostfix = dateFormat.format(new Date());
+ String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
+
+ // the number of tuples must match the number of messages
+ List<Tuple> tuples = Lists.newArrayList(tuplesIter);
+ int batchSize = tuples.size();
+ if(messages.size() != batchSize) {
+ throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
+ tuples.size(), messages.size()));
+ }
- final String indexPostfix = dateFormat.format(new Date());
- BulkRequest bulkRequest = new BulkRequest();
- for(JSONObject message: messages) {
-
- JSONObject esDoc = new JSONObject();
- for(Object k : message.keySet()){
- copyField(k.toString(), message, esDoc, fieldNameConverter);
- }
-
- String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
- IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
- indexRequest.source(esDoc.toJSONString());
- String guid = (String)esDoc.get(Constants.GUID);
- if(guid != null) {
- indexRequest.id(guid);
- }
-
- Object ts = esDoc.get("timestamp");
- if(ts != null) {
- indexRequest.timestamp(ts.toString());
- }
- bulkRequest.add(indexRequest);
+ // create a document from each message
+ for(int i=0; i<tuples.size(); i++) {
+ JSONObject message = messages.get(i);
+ Tuple tuple = tuples.get(i);
+ TupleBasedDocument document = createDocument(message, tuple, sensorType, fieldNameConverter);
+ documentWriter.addDocument(document, indexName);
}
- BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
- return buildWriteReponse(tuples, bulkResponse);
- }
+ // write the documents
+ BulkDocumentWriterResults<TupleBasedDocument> results = documentWriter.write();
- @Override
- public String getName() {
- return "elasticsearch";
+ // build the response
+ BulkWriterResponse response = new BulkWriterResponse();
+ for(WriteSuccess<TupleBasedDocument> success: results.getSuccesses()) {
+ response.addSuccess(success.getDocument().getTuple());
+ }
+ for(WriteFailure<TupleBasedDocument> failure: results.getFailures()) {
+ response.addError(failure.getCause(), failure.getDocument().getTuple());
+ }
+ return response;
}
- protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, BulkResponse bulkResponse) throws Exception {
- // Elasticsearch responses are in the same order as the request, giving us an implicit mapping with Tuples
- BulkWriterResponse writerResponse = new BulkWriterResponse();
- if (bulkResponse.hasFailures()) {
- Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
- Iterator<Tuple> tupleIter = tuples.iterator();
- while (respIter.hasNext() && tupleIter.hasNext()) {
- BulkItemResponse item = respIter.next();
- Tuple tuple = tupleIter.next();
-
- if (item.isFailed()) {
- writerResponse.addError(item.getFailure().getCause(), tuple);
- } else {
- writerResponse.addSuccess(tuple);
- }
-
- // Should never happen, so fail the entire batch if it does.
- if (respIter.hasNext() != tupleIter.hasNext()) {
- throw new Exception(bulkResponse.buildFailureMessage());
- }
- }
+ private TupleBasedDocument createDocument(JSONObject message,
+ Tuple tuple,
+ String sensorType,
+ FieldNameConverter fieldNameConverter) {
+ // transform the message fields to the source fields of the indexed document
+ JSONObject source = new JSONObject();
+ for(Object k : message.keySet()){
+ copyField(k.toString(), message, source, fieldNameConverter);
+ }
+
+ // define the document id
+ String guid = ConversionUtils.convert(source.get(Constants.GUID), String.class);
+ if(guid == null) {
+ LOG.warn("Missing '{}' field; document ID will be auto-generated.", Constants.GUID);
+ }
+
+ // define the document timestamp
+ Long timestamp = null;
+ Object value = source.get(TIMESTAMP.getName());
+ if(value != null) {
+ timestamp = Long.parseLong(value.toString());
} else {
- writerResponse.addAllSuccesses(tuples);
+ LOG.warn("Missing '{}' field; timestamp will be set to system time.", TIMESTAMP.getName());
}
- return writerResponse;
+ return new TupleBasedDocument(source, guid, sensorType, timestamp, tuple);
+ }
+
+ @Override
+ public String getName() {
+ return "elasticsearch";
}
@Override
public void close() throws Exception {
- client.close();
+ if(client != null) {
+ client.close();
+ }
}
/**
@@ -167,5 +194,13 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
// copy the field
destination.put(destinationFieldName, source.get(sourceFieldName));
}
+
+ /**
+ * Set the document writer. Primarily used for testing.
+ * @param documentWriter The {@link BulkDocumentWriter} to use.
+ */
+ public void setDocumentWriter(BulkDocumentWriter<TupleBasedDocument> documentWriter) {
+ this.documentWriter = documentWriter;
+ }
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
new file mode 100644
index 0000000..ba44937
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.writer;
+
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * An {@link Document} that is created from the contents of a {@link Tuple}.
+ */
+public class TupleBasedDocument extends Document {
+
+ private Tuple tuple;
+
+ public TupleBasedDocument(Map<String, Object> document,
+ String guid,
+ String sensorType,
+ Long timestamp,
+ Tuple tuple) {
+ super(document, guid, sensorType, timestamp);
+ this.tuple = tuple;
+ }
+
+ public Tuple getTuple() {
+ return tuple;
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
new file mode 100644
index 0000000..b313811
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchBulkDocumentWriterTest {
+
+ ElasticsearchBulkDocumentWriter<Document> writer;
+ ElasticsearchClient client;
+ RestHighLevelClient highLevelClient;
+
+ @Before
+ public void setup() {
+ // mock Elasticsearch
+ highLevelClient = mock(RestHighLevelClient.class);
+ client = mock(ElasticsearchClient.class);
+ when(client.getHighLevelClient()).thenReturn(highLevelClient);
+
+ writer = new ElasticsearchBulkDocumentWriter<>(client);
+ }
+
+ @Test
+ public void testWriteSuccess() throws IOException {
+ setupElasticsearchToSucceed();
+
+ // write a document successfully
+ Document doc = document(message());
+ String index = "bro_index";
+ writer.addDocument(doc, index);
+
+ BulkDocumentWriterResults<Document> results = writer.write();
+ assertEquals(1, results.getSuccesses().size());
+ assertEquals(0, results.getFailures().size());
+
+ WriteSuccess<Document> success = results.getSuccesses().get(0);
+ assertEquals(doc, success.getDocument());
+ }
+
+ @Test
+ public void testWriteFailure() throws IOException {
+ setupElasticsearchToFail();
+
+ // the document will fail to write
+ Document doc = document(message());
+ String index = "bro_index";
+ writer.addDocument(doc, index);
+
+ BulkDocumentWriterResults<Document> results = writer.write();
+ assertEquals(0, results.getSuccesses().size());
+ assertEquals(1, results.getFailures().size());
+
+ WriteFailure<Document> failure = results.getFailures().get(0);
+ assertEquals(doc, failure.getDocument());
+ assertEquals("error message", failure.getMessage());
+ assertNotNull(failure.getCause());
+ }
+
+ @Test
+ public void testSizeWhenWriteSuccessful() throws IOException {
+ setupElasticsearchToSucceed();
+ assertEquals(0, writer.size());
+
+ // add some documents to write
+ String index = "bro_index";
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ assertEquals(5, writer.size());
+
+ // after the write, all documents should have been flushed
+ writer.write();
+ assertEquals(0, writer.size());
+ }
+
+ @Test
+ public void testSizeWhenWriteFails() throws IOException {
+ setupElasticsearchToFail();
+ assertEquals(0, writer.size());
+
+ // add some documents to write
+ String index = "bro_index";
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ writer.addDocument(document(message()), index);
+ assertEquals(5, writer.size());
+
+ // after the write, all documents should have been flushed
+ writer.write();
+ assertEquals(0, writer.size());
+ }
+
+ private void setupElasticsearchToFail() throws IOException {
+ // define the item failure
+ BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
+ when(failure.getCause()).thenReturn(new Exception("test exception"));
+ when(failure.getMessage()).thenReturn("error message");
+
+ // define the item level response
+ BulkItemResponse itemResponse = mock(BulkItemResponse.class);
+ when(itemResponse.isFailed()).thenReturn(true);
+ when(itemResponse.getItemId()).thenReturn(0);
+ when(itemResponse.getFailure()).thenReturn(failure);
+ when(itemResponse.getFailureMessage()).thenReturn("error message");
+ List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse);
+
+ // define the bulk response to indicate failure
+ BulkResponse response = mock(BulkResponse.class);
+ when(response.iterator()).thenReturn(itemsResponses.iterator());
+ when(response.hasFailures()).thenReturn(true);
+
+ // have the client return the mock response
+ when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
+ }
+
+ private void setupElasticsearchToSucceed() throws IOException {
+ // define the bulk response to indicate success
+ BulkResponse response = mock(BulkResponse.class);
+ when(response.hasFailures()).thenReturn(false);
+
+ // have the client return the mock response
+ when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
+ }
+
+ private Document document(JSONObject message) {
+ String guid = UUID.randomUUID().toString();
+ String sensorType = "bro";
+ Long timestamp = System.currentTimeMillis();
+ return new Document(message, guid, sensorType, timestamp);
+ }
+
+ private JSONObject message() {
+ JSONObject message = new JSONObject();
+ message.put(Constants.GUID, UUID.randomUUID().toString());
+ message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
+ message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+ return message;
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 227f5ef..dfdf88e 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -17,40 +17,11 @@
*/
package org.apache.metron.elasticsearch.integration.components;
-import static java.util.Arrays.asList;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BooleanSupplier;
-
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.FileUtils;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.elasticsearch.client.ElasticsearchClient;
-import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
-import org.apache.metron.elasticsearch.dao.ElasticsearchColumnMetadataDao;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchRequestSubmitter;
-import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchSearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchUpdateDao;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.update.Document;
-import org.apache.metron.indexing.dao.update.UpdateDao;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -62,9 +33,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
@@ -76,11 +44,23 @@ import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.Netty4Plugin;
-import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
public class ElasticSearchComponent implements InMemoryComponent {
private static class Mapping {
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index 6a3638b..e5e85b0 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -18,170 +18,290 @@
package org.apache.metron.elasticsearch.writer;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.storm.tuple.Tuple;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.junit.Test;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class ElasticsearchWriterTest {
- @Test
- public void testSingleSuccesses() throws Exception {
- Tuple tuple1 = mock(Tuple.class);
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(false);
+ Map stormConf;
+ TopologyContext topologyContext;
+ WriterConfiguration writerConfiguration;
- BulkWriterResponse expected = new BulkWriterResponse();
- expected.addSuccess(tuple1);
+ @Before
+ public void setup() {
+ topologyContext = mock(TopologyContext.class);
- ElasticsearchWriter esWriter = new ElasticsearchWriter();
- BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
+ writerConfiguration = mock(WriterConfiguration.class);
+ when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
- assertEquals("Response should have no errors and single success", expected, actual);
+ stormConf = new HashMap();
}
@Test
- public void testMultipleSuccesses() throws Exception {
- Tuple tuple1 = mock(Tuple.class);
- Tuple tuple2 = mock(Tuple.class);
-
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(false);
-
- BulkWriterResponse expected = new BulkWriterResponse();
- expected.addSuccess(tuple1);
- expected.addSuccess(tuple2);
-
+ public void shouldWriteSuccessfully() {
+ // create a tuple and a message associated with that tuple
+ List<Tuple> tuples = createTuples(1);
+ List<JSONObject> messages = createMessages(1);
+
+ // create a document writer which will successfully write all
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
+
+ // attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
- BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
- assertEquals("Response should have no errors and two successes", expected, actual);
+ // response should only contain successes
+ assertFalse(response.hasErrors());
+ assertTrue(response.getSuccesses().contains(tuples.get(0)));
}
@Test
- public void testSingleFailure() throws Exception {
- Tuple tuple1 = mock(Tuple.class);
-
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(true);
-
- Exception e = new IllegalStateException();
- BulkItemResponse itemResponse = buildBulkItemFailure(e);
- when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
-
- BulkWriterResponse expected = new BulkWriterResponse();
- expected.addError(e, tuple1);
-
+ public void shouldWriteManySuccessfully() {
+ // create a few tuples and the messages associated with the tuples
+ List<Tuple> tuples = createTuples(3);
+ List<JSONObject> messages = createMessages(3);
+
+ // create a document writer which will successfully write all
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+ results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
+ results.addSuccess(createDocument(messages.get(2), tuples.get(2)));
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
+
+ // attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
- BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
-
- assertEquals("Response should have one error and zero successes", expected, actual);
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+ // response should only contain successes
+ assertFalse(response.hasErrors());
+ assertTrue(response.getSuccesses().contains(tuples.get(0)));
+ assertTrue(response.getSuccesses().contains(tuples.get(1)));
+ assertTrue(response.getSuccesses().contains(tuples.get(2)));
}
@Test
- public void testTwoSameFailure() throws Exception {
- Tuple tuple1 = mock(Tuple.class);
- Tuple tuple2 = mock(Tuple.class);
-
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(true);
-
- Exception e = new IllegalStateException();
-
- BulkItemResponse itemResponse = buildBulkItemFailure(e);
- BulkItemResponse itemResponse2 = buildBulkItemFailure(e);
-
- when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator());
-
- BulkWriterResponse expected = new BulkWriterResponse();
- expected.addError(e, tuple1);
- expected.addError(e, tuple2);
-
+ public void shouldHandleWriteFailure() {
+ // create a tuple and a message associated with that tuple
+ List<Tuple> tuples = createTuples(1);
+ List<JSONObject> messages = createMessages(1);
+ Exception cause = new Exception();
+
+ // create a document writer which will fail all writes
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
+
+ // attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
- BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
-
- assertEquals("Response should have two errors and no successes", expected, actual);
-
- // Ensure the errors actually get collapsed together
- Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors();
- HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>();
- expectedErrors.put(e, ImmutableList.of(tuple1, tuple2));
- assertEquals("Errors should have collapsed together", expectedErrors, actualErrors);
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+ // the writer response should only contain failures
+ assertEquals(0, response.getSuccesses().size());
+ assertEquals(1, response.getErrors().size());
+ Collection<Tuple> errors = response.getErrors().get(cause);
+ assertTrue(errors.contains(tuples.get(0)));
}
@Test
- public void testTwoDifferentFailure() throws Exception {
- Tuple tuple1 = mock(Tuple.class);
- Tuple tuple2 = mock(Tuple.class);
+ public void shouldHandleManyWriteFailures() {
+ // create a few tuples and the messages associated with the tuples
+ int count = 3;
+ List<Tuple> tuples = createTuples(count);
+ List<JSONObject> messages = createMessages(count);
+ Exception cause = new Exception();
+
+ // create a document writer which will fail all writes
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
+ results.addFailure(createDocument(messages.get(1), tuples.get(1)), cause, "error");
+ results.addFailure(createDocument(messages.get(2), tuples.get(2)), cause, "error");
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
+
+ // attempt to write
+ ElasticsearchWriter esWriter = new ElasticsearchWriter();
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+ // the writer response should only contain failures
+ assertEquals(0, response.getSuccesses().size());
+ assertEquals(1, response.getErrors().size());
+ Collection<Tuple> errors = response.getErrors().get(cause);
+ assertTrue(errors.contains(tuples.get(0)));
+ assertTrue(errors.contains(tuples.get(1)));
+ assertTrue(errors.contains(tuples.get(2)));
+ }
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(true);
+ @Test
+ public void shouldHandlePartialFailures() {
+ // create a few tuples and the messages associated with the tuples
+ int count = 2;
+ List<Tuple> tuples = createTuples(count);
+ List<JSONObject> messages = createMessages(count);
+ Exception cause = new Exception();
+
+ // create a document writer that will fail one and succeed the other
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
+ results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
+
+ // attempt to write
+ ElasticsearchWriter esWriter = new ElasticsearchWriter();
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+ // response should contain some successes and some failures
+ assertEquals(1, response.getSuccesses().size());
+ assertEquals(1, response.getErrors().size());
+ assertTrue(response.getErrors().get(cause).contains(tuples.get(0)));
+ assertTrue(response.getSuccesses().contains(tuples.get(1)));
+ }
- Exception e = new IllegalStateException("Cause");
- Exception e2 = new IllegalStateException("Different Cause");
- BulkItemResponse itemResponse = buildBulkItemFailure(e);
- BulkItemResponse itemResponse2 = buildBulkItemFailure(e2);
+ @Test(expected = IllegalStateException.class)
+ public void shouldCheckIfNumberOfMessagesMatchNumberOfTuples() {
+ ElasticsearchWriter esWriter = new ElasticsearchWriter();
+ esWriter.setDocumentWriter(mock(BulkDocumentWriter.class));
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
- when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator());
+ // there are 5 tuples and only 1 message; there should be 5 messages to match the number of tuples
+ List<Tuple> tuples = createTuples(5);
+ List<JSONObject> messages = createMessages(1);
- BulkWriterResponse expected = new BulkWriterResponse();
- expected.addError(e, tuple1);
- expected.addError(e2, tuple2);
+ esWriter.write("bro", writerConfiguration, tuples, messages);
+ fail("expected exception");
+ }
+ @Test
+ public void shouldWriteSuccessfullyWhenMessageTimestampIsString() {
+ List<Tuple> tuples = createTuples(1);
+ List<JSONObject> messages = createMessages(1);
+
+ // the timestamp is a String, rather than a Long
+ messages.get(0).put(Constants.Fields.TIMESTAMP.getName(), new Long(System.currentTimeMillis()).toString());
+
+ // create the document
+ JSONObject message = messages.get(0);
+ String timestamp = (String) message.get(Constants.Fields.TIMESTAMP.getName());
+ String guid = (String) message.get(Constants.GUID);
+ String sensorType = (String) message.get(Constants.SENSOR_TYPE);
+ TupleBasedDocument document = new TupleBasedDocument(message, guid, sensorType, Long.parseLong(timestamp), tuples.get(0));
+
+ // create a document writer which will successfully write that document
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addSuccess(document);
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
+
+ // attempt to write
ElasticsearchWriter esWriter = new ElasticsearchWriter();
- BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
- assertEquals("Response should have two errors and no successes", expected, actual);
-
- // Ensure the errors did not get collapsed together
- Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors();
- HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>();
- expectedErrors.put(e, ImmutableList.of(tuple1));
- expectedErrors.put(e2, ImmutableList.of(tuple2));
- assertEquals("Errors should not have collapsed together", expectedErrors, actualErrors);
+ // response should only contain successes
+ assertFalse(response.hasErrors());
+ assertTrue(response.getSuccesses().contains(tuples.get(0)));
}
@Test
- public void testSuccessAndFailure() throws Exception {
- Tuple tuple1 = mock(Tuple.class);
- Tuple tuple2 = mock(Tuple.class);
+ public void shouldWriteSuccessfullyWhenMissingGUID() {
+ // create a tuple and a message associated with that tuple
+ List<Tuple> tuples = createTuples(1);
+ List<JSONObject> messages = createMessages(1);
- BulkResponse response = mock(BulkResponse.class);
- when(response.hasFailures()).thenReturn(true);
+ // remove the GUID from the message
+ assertNotNull(messages.get(0).remove(Constants.GUID));
- Exception e = new IllegalStateException("Cause");
- BulkItemResponse itemResponse = buildBulkItemFailure(e);
+ // create a document writer which will successfully write all
+ BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+ results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+ BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+ when(docWriter.write()).thenReturn(results);
- BulkItemResponse itemResponse2 = mock(BulkItemResponse.class);
- when(itemResponse2.isFailed()).thenReturn(false);
+ // attempt to write
+ ElasticsearchWriter esWriter = new ElasticsearchWriter();
+ esWriter.setDocumentWriter(docWriter);
+ esWriter.init(stormConf, topologyContext, writerConfiguration);
+ BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
- when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator());
+ // response should only contain successes
+ assertFalse(response.hasErrors());
+ assertTrue(response.getSuccesses().contains(tuples.get(0)));
+ }
- BulkWriterResponse expected = new BulkWriterResponse();
- expected.addError(e, tuple1);
- expected.addSuccess(tuple2);
+ private TupleBasedDocument createDocument(JSONObject message, Tuple tuple) {
+ Long timestamp = (Long) message.get(Constants.Fields.TIMESTAMP.getName());
+ String guid = (String) message.get(Constants.GUID);
+ String sensorType = (String) message.get(Constants.SENSOR_TYPE);
+ return new TupleBasedDocument(message, guid, sensorType, timestamp, tuple);
+ }
- ElasticsearchWriter esWriter = new ElasticsearchWriter();
- BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+ private JSONObject message() {
+ JSONObject message = new JSONObject();
+ message.put(Constants.GUID, UUID.randomUUID().toString());
+ message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
+ message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+ message.put(Constants.SENSOR_TYPE, "sensor");
+ return message;
+ }
+
+ private Map<String, Object> globals() {
+ Map<String, Object> globals = new HashMap<>();
+ globals.put("es.date.format", "yyyy.MM.dd.HH");
+ return globals;
+ }
- assertEquals("Response should have one error and one success", expected, actual);
+ private List<Tuple> createTuples(int count) {
+ List<Tuple> tuples = new ArrayList<>();
+ for(int i=0; i<count; i++) {
+ tuples.add(mock(Tuple.class));
+ }
+ return tuples;
}
- private BulkItemResponse buildBulkItemFailure(Exception e) {
- BulkItemResponse itemResponse = mock(BulkItemResponse.class);
- when(itemResponse.isFailed()).thenReturn(true);
- BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
- when(itemResponse.getFailure()).thenReturn(failure);
- when(failure.getCause()).thenReturn(e);
- return itemResponse;
+ private List<JSONObject> createMessages(int count) {
+ List<JSONObject> messages = new ArrayList<>();
+ for(int i=0; i<count; i++) {
+ messages.add(message());
+ }
+ return messages;
}
}
diff --git a/metron-platform/metron-indexing/src/test/resources/log4j.properties b/metron-platform/metron-indexing/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e69de29