You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/10/11 20:30:01 UTC

metron git commit: METRON-1801 Allow Customization of Elasticsearch Document ID (nickwallen) closes apache/metron#1218

Repository: metron
Updated Branches:
  refs/heads/master 9b6710053 -> 90c5e1d28


METRON-1801 Allow Customization of Elasticsearch Document ID (nickwallen) closes apache/metron#1218


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/90c5e1d2
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/90c5e1d2
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/90c5e1d2

Branch: refs/heads/master
Commit: 90c5e1d2896b8a95bd6c928e3dab96f1fd85167c
Parents: 9b67100
Author: nickwallen <ni...@nickallen.org>
Authored: Thu Oct 11 16:29:40 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Thu Oct 11 16:29:40 2018 -0400

----------------------------------------------------------------------
 Upgrading.md                                    |  23 ++-
 .../METRON/CURRENT/configuration/metron-env.xml |   9 ++
 .../CURRENT/package/scripts/metron_service.py   |   5 +
 .../package/scripts/params/params_linux.py      |   1 +
 .../METRON/CURRENT/themes/metron_theme.json     |  10 ++
 metron-platform/metron-elasticsearch/README.md  |  37 ++++-
 metron-platform/metron-elasticsearch/pom.xml    |  17 ---
 .../dao/ElasticsearchRetrieveLatestDao.java     | 147 ++++++++++++-------
 .../dao/ElasticsearchSearchDao.java             |  28 +++-
 .../elasticsearch/utils/ElasticsearchUtils.java | 100 +++++++++----
 .../writer/ElasticsearchWriter.java             |  39 +++--
 .../elasticsearch/dao/ElasticsearchDaoTest.java |  10 +-
 .../ElasticsearchSearchIntegrationTest.java     |  20 ++-
 .../indexing/dao/SearchIntegrationTest.java     |  21 +--
 .../integration/SolrSearchIntegrationTest.java  |  19 +++
 15 files changed, 341 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/Upgrading.md
----------------------------------------------------------------------
diff --git a/Upgrading.md b/Upgrading.md
index 2124ac5..c3b2a0f 100644
--- a/Upgrading.md
+++ b/Upgrading.md
@@ -16,9 +16,22 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 # Upgrading
+
 This document constitutes a per-version listing of changes of
 configuration which are non-backwards compatible.
 
+## 0.6.0 to 0.6.1
+
+### [METRON-1801 Allow Customization of Elasticsearch Document ID](https://issues.apache.org/jira/browse/METRON-1801)
+
+A global property named `es.document.id` was added to define the field from which the document ID is set when a message is indexed by Elasticsearch. To allow Elasticsearch to define its own document id, this property should be set to a blank or empty string. The client will not set the document ID and Elasticsearch will define its own. In most cases allowing Elasticsearch to define the document ID is the most performant option. This is now the default behavior.
+
+Metron versions 0.6.0 and earlier defined the document ID using the Metron GUID, which is a randomized UUID using Java's `UUID.randomUUID()`. Using a randomized UUID can negatively impact Elasticsearch indexing performance. To maintain backwards compatibility with legacy versions of Metron use the following global property setting.
+
+    ```
+    es.document.id = guid
+    ```
+
 ## 0.4.2 to 0.5.0
 
 ### [METRON-941: native PaloAlto parser corrupts message when having a comma in the payload](https://issues.apache.org/jira/browse/METRON-941)
@@ -89,7 +102,7 @@ For a more detailed description, please see metron-platform/metron-elasticsearch
 
 ### Description
 
-In the 0.4.2 release, 
+In the 0.4.2 release,
 
 ## 0.3.1 to 0.4.0
 
@@ -107,7 +120,7 @@ This effectively limits the build environment to Docker supported [platforms](ht
 #### Description
 
 As of 0.3.0 the indexing configuration
-* Is held in the enrichment configuration for a sensor 
+* Is held in the enrichment configuration for a sensor
 * Has properties which control every writers (i.e. HDFS, solr or elasticsearch).
 
 In the 0.3.1 release, this configuration has been broken out
@@ -136,7 +149,7 @@ You would create a file to configure each writer for sensor `foo` called `$METRO
     "batchSize" : 100,
     "enabled" : true
   },
-  "hdfs" : { 
+  "hdfs" : {
     "index" : "foo",
     "batchSize" : 100,
     "enabled" : true
@@ -151,7 +164,7 @@ You would create a file to configure each writer for sensor `foo` called `$METRO
 As of 0.3.0, threat triage rules were defined as a simple Map associating a Stellar expression with a score.
 As of 0.3.1, due to the fact that there may be many threat triage rules, we have made the rules more complex.
 To help organize these, we have made the threat triage objects in their own right that contain optional name and optional comment fields.
-   
+
 This essentially makes the risk level rules slightly more complex.  The format goes from:
 ```
 "riskLevelRules" : {
@@ -169,7 +182,7 @@ to:
      }
 ]
 ```
-   
+
 #### Migration
 
 For every sensor enrichment configuration, you will need to migrate the `riskLevelRules` section

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 81dda6c..0c7d295 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -122,6 +122,15 @@
         </value-attributes>
     </property>
     <property>
+        <name>es_document_id</name>
+        <description>Message field containing the Elasticsearch document ID</description>
+        <value></value>
+        <display-name>Elasticsearch Document ID Source Field</display-name>
+        <value-attributes>
+            <empty-value-valid>true</empty-value-valid>
+        </value-attributes>
+    </property>
+    <property>
         <name>solr_zookeeper_url</name>
         <value>{{zookeeper_quorum}}</value>
         <description>Comma delimited list of Zookeeper Urls: zkhost1:2181,zkhost1:2181</description>

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 9d15e93..9b9d99b 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -83,6 +83,11 @@ def elasticsearch_global_config_patches():
         "op": "add",
         "path": "/es.date.format",
         "value": "{{es_date_format}}"
+    },
+    {
+        "op": "add",
+        "path": "/es.document.id",
+        "value": "{{es_document_id}}"
     }
   """
 

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 458a7be..316964a 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -101,6 +101,7 @@ es_url = ",".join([host + ":" + es_binary_port for host in es_host_list])
 es_http_port = config['configurations']['metron-env']['es_http_port']
 es_http_url = es_host_list[0] + ":" + es_http_port
 es_date_format = config['configurations']['metron-env']['es_date_format']
+es_document_id = config['configurations']['metron-env']['es_document_id']
 
 # hadoop params
 stack_root = Script.get_stack_root()

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 7e6c83a..34eff50 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -476,6 +476,10 @@
           "subsection-name": "subsection-index-settings"
         },
         {
+          "config": "metron-env/es_document_id",
+          "subsection-name": "subsection-index-settings"
+        },
+        {
           "config": "metron-env/solr_zookeeper_url",
           "subsection-name": "subsection-index-settings"
         },
@@ -949,6 +953,12 @@
         }
       },
       {
+        "config": "metron-env/es_document_id",
+        "widget": {
+          "type": "text-field"
+        }
+      },
+      {
         "config": "metron-env/solr_zookeeper_url",
         "widget": {
           "type": "text-field"

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md
index 177412e..25e15b4 100644
--- a/metron-platform/metron-elasticsearch/README.md
+++ b/metron-platform/metron-elasticsearch/README.md
@@ -59,6 +59,39 @@ For instance, an `es.date.format` of `yyyy.MM.dd.HH` would have the consequence
 roll hourly, whereas an `es.date.format` of `yyyy.MM.dd` would have the consequence that the indices would
 roll daily.
 
+### `es.document.id`
+
+This property sets the message field that is used to define the document ID when a message is indexed by Elasticsearch.  By default, the client does not set the document ID and document ID generation is deferred to Elasticsearch. 
+
+#### Option 1: Defer to Elasticsearch
+
+Value: (Undefined, blank or empty string)
+
+* This option allows Elasticsearch to generate the document ID. 
+* The client will not set a document ID. 
+* In most cases this is the most performant option.
+* This is the default behavior.
+
+#### Option 2: Legacy Compatibility
+
+Value: guid
+
+* Metron versions 0.6.0 and earlier defined the document ID using the Metron GUID, which is a randomized UUID using Java's `UUID.randomUUID()`. 
+* Using a randomized UUID can negatively impact Elasticsearch indexing performance. 
+* To maintain backwards compatibility with legacy versions of Metron, set the value to `guid`.
+    ```
+    es.document.id = guid
+    ``` 
+
+#### Option 3: Custom Document ID
+
+* Advanced users can define a custom document ID.
+* Create an enrichment that defines a new message field; for example one called `my_document_id`. Use this field to set the document ID. This will set the document ID to the value of the message field `my_document_id`.
+    ```
+    es.document.id = my_document_id
+    ```
+* If a message does not contain the `es.document.id` field, a warning is issued and no document ID is set by the client.
+
 ## Upgrading to 5.6.2
 
 Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 to 5.6.2. There are a number of template changes, most notably around
@@ -321,7 +354,7 @@ We'll want to put the template back into Elasticsearch:
 curl -XPUT "http://${ELASTICSEARCH}:9200/_template/${SENSOR}_index" -d @${SENSOR}.template
 ```
 
-To update existing indexes, update Elasticsearch mappings with the new field for each sensor. 
+To update existing indexes, update Elasticsearch mappings with the new field for each sensor.
 
 ```
 curl -XPUT "http://${ELASTICSEARCH}:9200/${SENSOR}_index*/_mapping/${SENSOR}_doc" -d '
@@ -340,7 +373,7 @@ rm ${SENSOR}.template
 
 The stock set of Elasticsearch templates for bro, snort, yaf, error index and meta index are installed automatically during the first time install and startup of Metron Indexing service.
 
-It is possible that Elasticsearch service is not available when the Metron Indexing Service startup, in that case the Elasticsearch template will not be installed. 
+It is possible that Elasticsearch service is not available when the Metron Indexing Service startup, in that case the Elasticsearch template will not be installed.
 
 For such a scenario, an Admin can have the template installed in two ways:
 

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml
index adc601a..edec1bb 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -206,24 +206,7 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-            <version>${global_log4j_core_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-            <version>${global_log4j_core_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava-testlib</artifactId>
-            <version>${global_guava_version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
-
     <build>
         <plugins>
             <plugin>

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index f6bfeda..2f00b68 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -20,27 +20,36 @@ package org.apache.metron.elasticsearch.dao;
 
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Function;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.RetrieveLatestDao;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
 
 public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
 
+  protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private TransportClient transportClient;
 
   public ElasticsearchRetrieveLatestDao(TransportClient transportClient) {
@@ -49,7 +58,7 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
 
   @Override
   public Document getLatest(String guid, String sensorType) {
-    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(hit));
     return doc.orElse(null);
   }
 
@@ -61,26 +70,19 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
       guids.add(getRequest.getGuid());
       sensorTypes.add(getRequest.getSensorType());
     }
-    List<Document> documents = searchByGuids(
-        guids,
-        sensorTypes,
-        hit -> {
-          Long ts = 0L;
-          String doc = hit.getSourceAsString();
-          String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
-          try {
-            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
-          }
-        }
-
-    );
-    return documents;
+    return searchByGuids(guids, sensorTypes, hit -> toDocument(hit));
   }
 
-  <T> Optional<T> searchByGuid(String guid, String sensorType,
-      Function<SearchHit, Optional<T>> callback) {
+  /**
+   * Search for a Metron GUID.
+   *
+   * @param guid The Metron GUID to search for.
+   * @param sensorType The sensor type to include in the search.
+   * @param callback A callback that transforms a search hit to a search result.
+   * @param <T> The type of search results expected.
+   * @return All documents with the given Metron GUID and sensor type.
+   */
+  <T> Optional<T> searchByGuid(String guid, String sensorType, Function<SearchHit, Optional<T>> callback) {
     Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
     List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
     if (results.size() > 0) {
@@ -91,48 +93,69 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
   }
 
   /**
-   * Return the search hit based on the UUID and sensor type.
-   * A callback can be specified to transform the hit into a type T.
-   * If more than one hit happens, the first one will be returned.
+   * Search for one or more Metron GUIDs.
+   *
+   * @param guids The Metron GUIDs to search by.
+   * @param sensorTypes The sensor types to include in the search.
+   * @param callback A callback that transforms a search hit to a search result.
+   * @param <T> The type of expected results.
+   * @return All documents with the given Metron GUID and sensor type.
    */
-  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
-      Function<SearchHit, Optional<T>> callback) {
+  <T> List<T> searchByGuids(Collection<String> guids,
+                            Collection<String> sensorTypes,
+                            Function<SearchHit, Optional<T>> callback) {
     if (guids == null || guids.isEmpty()) {
       return Collections.emptyList();
     }
-    QueryBuilder query = null;
-    IdsQueryBuilder idsQuery;
-    if (sensorTypes != null) {
-      String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc")
-          .toArray(String[]::new);
-      idsQuery = QueryBuilders.idsQuery(types);
-    } else {
-      idsQuery = QueryBuilders.idsQuery();
-    }
 
-    for (String guid : guids) {
-      query = idsQuery.addIds(guid);
+    // search by metron's GUID field
+    QueryBuilder query = boolQuery().must(termsQuery(Constants.GUID, guids));
+    SearchRequestBuilder request = transportClient
+            .prepareSearch()
+            .setQuery(query)
+            .setSize(guids.size());
+
+    if(sensorTypes != null) {
+      request.setTypes(toDocTypes(sensorTypes));
     }
 
-    SearchRequestBuilder request = transportClient.prepareSearch()
-        .setQuery(query)
-        .setSize(guids.size());
-    org.elasticsearch.action.search.SearchResponse response = request.get();
-    SearchHits hits = response.getHits();
+    // transform the search hits to results
+    SearchHits hits = request.get().getHits();
+    return getResults(hits, callback);
+  }
+
+  /**
+   * Retrieve search results.
+   *
+   * @param hits The search hit.
+   * @param callback A callback function that transforms search hits to search results.
+   * @param <T> The execpted type of search result.
+   * @return The search results.
+   */
+  private <T> List<T> getResults(SearchHits hits, Function<SearchHit, Optional<T>> callback) {
     List<T> results = new ArrayList<>();
+
     for (SearchHit hit : hits) {
       Optional<T> result = callback.apply(hit);
       if (result.isPresent()) {
         results.add(result.get());
       }
     }
+
     return results;
   }
 
-  private Optional<Document> toDocument(final String guid, SearchHit hit) {
+  /**
+   * Transforms a {@link SearchHit} to a {@link Document}.
+   *
+   * @param hit The result of a search.
+   * @return An optional {@link Document}.
+   */
+  private Optional<Document> toDocument(SearchHit hit) {
     Long ts = 0L;
     String doc = hit.getSourceAsString();
     String sourceType = toSourceType(hit.getType());
+    String guid = ElasticsearchUtils.getGUID(hit);
     try {
       return Optional.of(new Document(doc, guid, sourceType, ts));
     } catch (IOException e) {
@@ -148,4 +171,26 @@ public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
   private String toSourceType(String docType) {
     return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
   }
+
+  /**
+   * Returns the doc types for a given collection of sensor types.
+   *
+   * @param sensorTypes The sensor types.
+   * @return The doc types associated with each sensor.
+   */
+  private String[] toDocTypes(Collection<String> sensorTypes) {
+    String[] result;
+
+    if(sensorTypes != null && sensorTypes.size() > 0) {
+      result = sensorTypes
+              .stream()
+              .map(sensorType -> sensorType + "_doc")
+              .toArray(String[]::new);
+
+    } else {
+      result = new String[0];
+    }
+
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 5cd0a4d..c944578 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -33,6 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
+
+import org.apache.metron.common.Constants;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
@@ -74,6 +76,8 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.metron.common.Constants.GUID;
+
 public class ElasticsearchSearchDao implements SearchDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -284,19 +288,29 @@ public class ElasticsearchSearchDao implements SearchDao {
         .toArray(value -> new String[indices.size()]);
   }
 
-  private SearchResult getSearchResult(SearchHit searchHit, List<String> fields) {
-    SearchResult searchResult = new SearchResult();
-    searchResult.setId(searchHit.getId());
+  /**
+   * Transforms a {@link SearchHit} to a {@link SearchResult}.
+   *
+   * @param searchHit The result of a search request.
+   * @param fieldsToKeep The source fields to keep.  All others are excluded.  If null, all source fields are kept.
+   * @return A search result.
+   */
+  private SearchResult getSearchResult(SearchHit searchHit, List<String> fieldsToKeep) {
+
     Map<String, Object> source;
-    if (fields != null) {
-      Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap();
+    if (fieldsToKeep != null) {
       source = new HashMap<>();
-      fields.forEach(field -> {
-        source.put(field, resultSourceAsMap.get(field));
+      // must always keep the GUID
+      fieldsToKeep.add(Constants.GUID);
+      fieldsToKeep.forEach(field -> {
+        source.put(field, searchHit.getSourceAsMap().get(field));
       });
     } else {
       source = searchHit.getSource();
     }
+
+    SearchResult searchResult = new SearchResult();
+    searchResult.setId(ElasticsearchUtils.getGUID(searchHit));
     searchResult.setSource(source);
     searchResult.setScore(searchHit.getScore());
     searchResult.setIndex(searchHit.getIndex());

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 98dc66d..31503fa 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -17,26 +17,9 @@
  */
 package org.apache.metron.elasticsearch.utils;
 
-import static java.lang.String.format;
-
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.commons.lang.StringUtils;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.HDFSUtils;
@@ -53,10 +36,29 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.metron.common.Constants.GUID;
+
 public class ElasticsearchUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -65,6 +67,14 @@ public class ElasticsearchUtils {
   private static final String USERNAME_CONFIG_KEY = "es.xpack.username";
   private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user";
 
+  /**
+   * Defines which message field, the document identifier is set to.
+   *
+   * <p>If defined, the value of the specified message field is set as the Elasticsearch doc ID. If
+   * this field is undefined or blank, then the document identifier is not set.
+   */
+  public static final String DOC_ID_SOURCE_FIELD = "es.document.id";
+  public static final String DOC_ID_SOURCE_FIELD_DEFAULT = "";
 
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
@@ -373,16 +383,48 @@ public class ElasticsearchUtils {
    * @param searchResponse An Elasticsearch SearchHit to be converted.
    * @return The list of SearchResults for the SearchHit
    */
-  protected static List<SearchResult> getSearchResults(
-      org.elasticsearch.action.search.SearchResponse searchResponse) {
-    return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
-          SearchResult searchResult = new SearchResult();
-          searchResult.setId(searchHit.getId());
-          searchResult.setSource(searchHit.getSource());
-          searchResult.setScore(searchHit.getScore());
-          searchResult.setIndex(searchHit.getIndex());
-          return searchResult;
-        }
-    ).collect(Collectors.toList());
+  protected static List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
+    SearchHit[] searchHits = searchResponse.getHits().getHits();
+    return Arrays.stream(searchHits)
+            .map(hit -> toSearchResult(hit))
+            .collect(Collectors.toList());
+  }
+
+  /**
+   * Transforms a {@link SearchHit} to a {@link SearchResult}.
+   *
+   * @param searchHit The search hit to transform.
+   * @return A {@link SearchResult} representing the {@link SearchHit}.
+   */
+  protected static SearchResult toSearchResult(SearchHit searchHit) {
+    SearchResult searchResult = new SearchResult();
+    searchResult.setId(getGUID(searchHit));
+    searchResult.setSource(searchHit.getSource());
+    searchResult.setScore(searchHit.getScore());
+    searchResult.setIndex(searchHit.getIndex());
+    return searchResult;
+  }
+
+  /**
+   * Retrieves the Metron GUID from a {@link SearchHit}.
+   *
+   * @param searchHit The search hit containing a Metron GUID.
+   * @return The Metron GUID.
+   */
+  public static String getGUID(SearchHit searchHit) {
+    String guid;
+    if(searchHit.hasSource() && searchHit.getSource().containsKey(GUID)) {
+      guid = (String) searchHit.getSource().get(GUID);
+
+    } else if(!searchHit.hasSource()) {
+      String template = "No source found, has it been disabled in the mapping? index=%s, docId=%s";
+      throw new IllegalStateException(String.format(template, searchHit.getIndex(), searchHit.getId()));
+
+    } else {
+      String template = "Missing expected field; field=%s, index=%s, docId=%s";
+      throw new IllegalStateException(String.format(template, GUID, searchHit.getIndex(), searchHit.getId()));
+    }
+
+    return guid;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 4b8dd08..9a18e8c 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.field.FieldNameConverter;
@@ -43,6 +45,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD;
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.DOC_ID_SOURCE_FIELD_DEFAULT;
+
 /**
  * A {@link BulkMessageWriter} that writes messages to Elasticsearch.
  */
@@ -71,36 +76,48 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
 
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
-
-    // fetch the field name converter for this sensor type
+    // writer settings
     FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
-
     final String indexPostfix = dateFormat.format(new Date());
+    final String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
+    final String docType = sensorType + "_doc";
+    final String docIdSourceField = (String) configurations.getGlobalConfig().getOrDefault(DOC_ID_SOURCE_FIELD, DOC_ID_SOURCE_FIELD_DEFAULT);
+
     BulkRequestBuilder bulkRequest = client.prepareBulk();
     for(JSONObject message: messages) {
 
+      // clone the message to use as the document that will be indexed
       JSONObject esDoc = new JSONObject();
       for(Object k : message.keySet()){
         copyField(k.toString(), message, esDoc, fieldNameConverter);
       }
 
-      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, sensorType + "_doc");
-      indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString());
-      String guid = (String)esDoc.get(Constants.GUID);
-      if(guid != null) {
-        indexRequestBuilder.setId(guid);
+      IndexRequestBuilder indexRequestBuilder = client
+              .prepareIndex(indexName, docType)
+              .setSource(esDoc.toJSONString());
+
+      // set the document identifier
+      if(StringUtils.isNotBlank(docIdSourceField)) {
+        String docId = (String) esDoc.get(docIdSourceField);
+        if(docId != null) {
+          indexRequestBuilder.setId(docId);
+        } else {
+          LOG.warn("Message is missing document ID source field; document ID not set; sourceField={}", docIdSourceField);
+        }
       }
 
-      Object ts = esDoc.get("timestamp");
+      // set the document timestamp, if one exists
+      Object ts = esDoc.get(Constants.Fields.TIMESTAMP.getName());
       if(ts != null) {
-        indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+        indexRequestBuilder.setTimestamp(ts.toString());
       }
 
       bulkRequest.add(indexRequestBuilder);
     }
 
     BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+    LOG.info("Wrote {} message(s) to Elasticsearch; sensorType={}, index={}, docType={}, took={}",
+            ArrayUtils.getLength(bulkResponse.getItems()), sensorType, indexName, docType, bulkResponse.getTook().format());
     return buildWriteReponse(tuples, bulkResponse);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 6c3c327..a2c8de5 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -29,6 +29,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.metron.common.Constants;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
@@ -57,16 +59,20 @@ public class ElasticsearchDaoTest {
 
     // setup the mock search hits
     SearchHit hit1 = mock(SearchHit.class);
-    when(hit1.getId()).thenReturn("id1");
+    when(hit1.getId()).thenReturn("docId1");
+    when(hit1.hasSource()).thenReturn(true);
     when(hit1.getSource()).thenReturn(new HashMap<String, Object>() {{
       put("field", "value1");
+      put(Constants.GUID, "id1");
     }});
     when(hit1.getScore()).thenReturn(0.1f);
 
     SearchHit hit2 = mock(SearchHit.class);
-    when(hit2.getId()).thenReturn("id2");
+    when(hit2.getId()).thenReturn("docId2");
+    when(hit2.hasSource()).thenReturn(true);
     when(hit2.getSource()).thenReturn(new HashMap<String, Object>() {{
       put("field", "value2");
+      put(Constants.GUID, "id2");
     }});
     when(hit2.getScore()).thenReturn(0.2f);
 

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 1d2d48e..df41cba 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -168,8 +168,6 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
     dao.search(request);
   }
 
-
-
   @Override
   public void returns_column_metadata_for_specified_indices() throws Exception {
     // getColumnMetadata with only bro
@@ -261,6 +259,24 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
     Assert.assertEquals("data 1", results.get(0).getSource().get("ttl"));
   }
 
+  @Test
+  public void queries_fields() throws Exception {
+    SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class);
+    SearchResponse response = getIndexDao().search(request);
+    Assert.assertEquals(10, response.getTotal());
+
+    List<SearchResult> results = response.getResults();
+    Assert.assertEquals(10, response.getResults().size());
+
+    // validate the source fields contained in the search response
+    for (int i = 0; i < 10; ++i) {
+      Map<String, Object> source = results.get(i).getSource();
+      Assert.assertNotNull(source);
+      Assert.assertNotNull(source.get(Constants.Fields.SRC_ADDR.getName()));
+      Assert.assertNotNull(source.get(Constants.GUID));
+    }
+  }
+
   @Override
   protected String getSourceTypeField() {
     return Constants.SENSOR_TYPE.replace('.', ':');

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index cfe5752..43e8c85 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
@@ -897,24 +898,6 @@ public abstract class SearchIntegrationTest {
   }
 
   @Test
-  public void queries_fields() throws Exception {
-    SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class);
-    SearchResponse response = getIndexDao().search(request);
-    Assert.assertEquals(10, response.getTotal());
-    List<SearchResult> results = response.getResults();
-    for (int i = 0; i < 5; ++i) {
-      Map<String, Object> source = results.get(i).getSource();
-      Assert.assertEquals(1, source.size());
-      Assert.assertNotNull(source.get("ip_src_addr"));
-    }
-    for (int i = 5; i < 10; ++i) {
-      Map<String, Object> source = results.get(i).getSource();
-      Assert.assertEquals(1, source.size());
-      Assert.assertNotNull(source.get("ip_src_addr"));
-    }
-  }
-
-  @Test
   public void sort_by_guid() throws Exception {
     SearchRequest request = JSONUtils.INSTANCE.load(sortByGuidQuery, SearchRequest.class);
     SearchResponse response = getIndexDao().search(request);
@@ -923,7 +906,7 @@ public abstract class SearchIntegrationTest {
     for (int i = 0; i < 5; ++i) {
       Map<String, Object> source = results.get(i).getSource();
       Assert.assertEquals(1, source.size());
-      Assert.assertEquals(source.get("guid"), "bro_" + (i + 1));
+      Assert.assertEquals(source.get(Constants.GUID), "bro_" + (i + 1));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/90c5e1d2/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
index 84f2222..58e9db4 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
@@ -33,6 +34,7 @@ import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.solr.dao.SolrDao;
 import org.apache.metron.solr.integration.components.SolrComponent;
@@ -224,6 +226,23 @@ public class SolrSearchIntegrationTest extends SearchIntegrationTest {
   }
 
   @Test
+  public void queries_fields() throws Exception {
+    SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class);
+    SearchResponse response = getIndexDao().search(request);
+    Assert.assertEquals(10, response.getTotal());
+
+    List<SearchResult> results = response.getResults();
+    Assert.assertEquals(10, response.getResults().size());
+
+    // validate the source fields contained in the search response
+    for (int i = 0; i < 10; ++i) {
+      Map<String, Object> source = results.get(i).getSource();
+      Assert.assertNotNull(source);
+      Assert.assertNotNull(source.get(Constants.Fields.SRC_ADDR.getName()));
+    }
+  }
+
+  @Test
   public void different_type_filter_query() throws Exception {
     thrown.expect(InvalidSearchException.class);
     SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFilterQuery, SearchRequest.class);