You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by mm...@apache.org on 2018/07/11 01:32:44 UTC

[28/50] [abbrv] metron git commit: METRON-1625 Merge master into Solr feature branch (merrimanr) closes apache/metron#1067

METRON-1625 Merge master into Solr feature branch (merrimanr) closes apache/metron#1067


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/2bf66503
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/2bf66503
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/2bf66503

Branch: refs/heads/feature/METRON-1554-pcap-query-panel
Commit: 2bf6650327359ded34d1f96540f13051b8e5c471
Parents: a89a72c
Author: merrimanr <me...@gmail.com>
Authored: Wed Jun 20 10:03:44 2018 -0500
Committer: merrimanr <me...@gmail.com>
Committed: Wed Jun 20 10:03:44 2018 -0500

----------------------------------------------------------------------
 .../elasticsearch/dao/ElasticsearchDao.java     |   4 +
 .../dao/ElasticsearchMetaAlertUpdateDao.java    |  32 ++--
 .../dao/ElasticsearchMetaAlertDaoTest.java      |  50 +++++-
 .../indexing/dao/metaalert/MetaScoresTest.java  |  26 +++
 .../metron/management/KafkaFunctions.java       | 173 +++++++++++++++++--
 .../KafkaFunctionsIntegrationTest.java          | 165 +++++++++++++++++-
 6 files changed, 418 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 3eb86ce..59f25f0 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -89,6 +89,10 @@ public class ElasticsearchDao implements IndexDao {
     return accessConfig;
   }
 
+  public void setAccessConfig(AccessConfig accessConfig) {
+    this.accessConfig = accessConfig;
+  }
+
   @Override
   public synchronized void init(AccessConfig config) {
     if (this.client == null) {

http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
index d757dfe..bb79b7a 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX;
 import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
 import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -48,11 +49,14 @@ import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 
 public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {
 
+  private static final String INDEX_NOT_FOUND_INDICES_KEY = "es.index";
+
   private ElasticsearchDao elasticsearchDao;
   private MetaAlertRetrieveLatestDao retrieveLatestDao;
   private int pageSize;
@@ -169,17 +173,23 @@ public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpda
     } else {
       Map<Document, Optional<String>> updates = new HashMap<>();
       updates.put(update, index);
-      // We need to update an alert itself.  Only that portion of the update can be delegated.
-      // We still need to get meta alerts potentially associated with it and update.
-      Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults()
-          .stream()
-          .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(),
-              MetaAlertConstants.METAALERT_TYPE, 0L))
-          .collect(Collectors.toList());
-      // Each meta alert needs to be updated with the new alert
-      for (Document metaAlert : metaAlerts) {
-        if (replaceAlertInMetaAlert(metaAlert, update)) {
-          updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));
+      try {
+        // We need to update an alert itself.  Only that portion of the update can be delegated.
+        // We still need to get meta alerts potentially associated with it and update.
+        Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
+                .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), MetaAlertConstants.METAALERT_TYPE, update.getTimestamp()))
+                .collect(Collectors.toList());
+        // Each meta alert needs to be updated with the new alert
+        for (Document metaAlert : metaAlerts) {
+          replaceAlertInMetaAlert(metaAlert, update);
+          updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
+        }
+      } catch (IndexNotFoundException e) {
+        List<String> indicesNotFound = e.getMetadata(INDEX_NOT_FOUND_INDICES_KEY);
+        // If no metaalerts have been created yet and the metaalerts index does not exist, assume no metaalerts exist for alert.
+        // Otherwise throw the exception.
+        if (indicesNotFound.size() != 1 || !METAALERTS_INDEX.equals(indicesNotFound.get(0))) {
+          throw e;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
index a3a5f16..70197ea 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java
@@ -18,15 +18,11 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.HBaseDao;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.MultiIndexDao;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
@@ -37,8 +33,21 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
 public class ElasticsearchMetaAlertDaoTest {
 
 
@@ -131,4 +140,35 @@ public class ElasticsearchMetaAlertDaoTest {
     createRequest.setAlerts(Collections.singletonList(new GetRequest("don't", "care")));
     emaDao.createMetaAlert(createRequest);
   }
+
+  @Test
+  public void testUpdateShouldUpdateOnMissingMetaAlertIndex() throws Exception {
+    ElasticsearchDao elasticsearchDao = mock(ElasticsearchDao.class);
+    ElasticsearchMetaAlertRetrieveLatestDao elasticsearchMetaAlertRetrieveLatestDao = mock(ElasticsearchMetaAlertRetrieveLatestDao.class);
+    MetaAlertConfig metaAlertConfig = mock(MetaAlertConfig.class);
+    ElasticsearchMetaAlertUpdateDao emauDao = spy(new ElasticsearchMetaAlertUpdateDao(elasticsearchDao, elasticsearchMetaAlertRetrieveLatestDao, metaAlertConfig, 1));
+
+    doThrow(new IndexNotFoundException(ElasticsearchMetaAlertDao.METAALERTS_INDEX)).when(emauDao).getMetaAlertsForAlert("alert_one");
+
+    Document update = new Document(new HashMap<>(), "alert_one", "", 0L);
+    emauDao.update(update, Optional.empty());
+
+    Map<Document, Optional<String>> expectedUpdate = new HashMap<Document, Optional<String>>() {{
+      put(update, Optional.empty());
+    }};
+    verify(elasticsearchDao).batchUpdate(expectedUpdate);
+  }
+
+  @Test(expected = IndexNotFoundException.class)
+  public void testUpdateShouldThrowExceptionOnMissingSensorIndex() throws Exception {
+    ElasticsearchDao elasticsearchDao = mock(ElasticsearchDao.class);
+    ElasticsearchMetaAlertRetrieveLatestDao elasticsearchMetaAlertRetrieveLatestDao = mock(ElasticsearchMetaAlertRetrieveLatestDao.class);
+    MetaAlertConfig metaAlertConfig = mock(MetaAlertConfig.class);
+    ElasticsearchMetaAlertUpdateDao emauDao = spy(new ElasticsearchMetaAlertUpdateDao(elasticsearchDao, elasticsearchMetaAlertRetrieveLatestDao, metaAlertConfig, 1));
+
+    doThrow(new IndexNotFoundException("bro")).when(emauDao).getMetaAlertsForAlert("alert_one");
+
+    Document update = new Document(new HashMap<>(), "alert_one", "", 0L);
+    emauDao.update(update, Optional.empty());
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java
index 1359ba9..6ebfad8 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaScoresTest.java
@@ -23,6 +23,7 @@ import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAAL
 import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.THREAT_FIELD_DEFAULT;
 import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.THREAT_SORT_DEFAULT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
@@ -30,6 +31,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.update.Document;
 import org.junit.Test;
 
@@ -72,4 +76,26 @@ public class MetaScoresTest {
     // by default, the overall threat score is the sum of all child threat scores
     assertEquals(30.0F, threatScore);
   }
+
+  @Test
+  public void testCalculateMetaScoresWithDifferentFieldName() {
+    List<Map<String, Object>> alertList = new ArrayList<>();
+
+    // add an alert with a threat score
+    alertList.add( Collections.singletonMap(MetaAlertConstants.THREAT_FIELD_DEFAULT, 10.0f));
+
+    // create the metaalert
+    Map<String, Object> docMap = new HashMap<>();
+    docMap.put(MetaAlertConstants.ALERT_FIELD, alertList);
+    Document metaalert = new Document(docMap, "guid", MetaAlertConstants.METAALERT_TYPE, 0L);
+
+    // Configure a different threat triage score field name
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setGlobalConfigSupplier(() -> new HashMap<String, Object>() {{
+      put(Constants.THREAT_SCORE_FIELD_PROPERTY, MetaAlertConstants.THREAT_FIELD_DEFAULT);
+    }});
+
+    MetaScores.calculateMetaScores(metaalert, MetaAlertConstants.THREAT_FIELD_DEFAULT, MetaAlertConstants.THREAT_SORT_DEFAULT);
+    assertNotNull(metaalert.getDocument().get(MetaAlertConstants.THREAT_FIELD_DEFAULT));
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index f256672..7c9c23f 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.management;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -30,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.metron.common.system.Clock;
-import org.apache.metron.profiler.client.stellar.Util;
 import org.apache.metron.stellar.common.LambdaExpression;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.stellar.common.utils.JSONUtils;
@@ -66,6 +66,7 @@ import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
  *  KAFKA_GET
  *  KAFKA_PUT
  *  KAFKA_TAIL
+ *  KAFKA_FIND
  *  KAFKA_PROPS
  */
 public class KafkaFunctions {
@@ -98,6 +99,30 @@ public class KafkaFunctions {
   public static final int DEFAULT_MAX_WAIT = 5000;
 
   /**
+   * The key for the global property that defines how a message is returned
+   * from the set of KAFKA functions.
+   *
+   * <p>simple - The result contains only the message value as a string.
+   * <p>rich - The result contains the message value, topic, partition, and offset.
+   */
+  public static final String MESSAGE_VIEW_PROPERTY = "stellar.kafka.message.view";
+
+  /**
+   * An acceptable value for the 'stellar.kafka.message.view' property. The result
+   * provided will contain only the message value as a string.
+   */
+  public static final String MESSAGE_VIEW_SIMPLE = "simple";
+
+  /**
+   * An acceptable value for the 'stellar.kafka.message.view' property.
+   *
+   * <p>Provides a view of each message with more detailed metadata beyond just the
+   * message value.  The result provided will contain the message value, topic, partition,
+   * and offset.
+   */
+  public static final String MESSAGE_VIEW_RICH = "rich";
+
+  /**
    * The default set of Kafka properties.
    */
   private static Properties defaultProperties = defaultKafkaProperties();
@@ -137,6 +162,12 @@ public class KafkaFunctions {
    *   KAFKA_GET('topic', 1, { "auto.offset.reset": "earliest" })
    *   }
    * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+   * this property value to 'simple' or simply not setting the property value, will result
+   * in the default view behavior.
    */
   @Stellar(
           namespace = "KAFKA",
@@ -202,7 +233,8 @@ public class KafkaFunctions {
         while(messages.size() < count && wait < maxWait) {
 
           for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) {
-            messages.add(record.value());
+            Object viewOfMessage = render(record, properties);
+            messages.add(viewOfMessage);
           }
 
           // how long have we waited?
@@ -247,6 +279,12 @@ public class KafkaFunctions {
    *   KAFKA_TAIL('topic', 10)
    *   }
    * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+   * this property value to 'simple' or simply not setting the property value, will result
+   * in the default view behavior.
    */
   @Stellar(
           namespace = "KAFKA",
@@ -312,7 +350,8 @@ public class KafkaFunctions {
         while(messages.size() < count && wait < maxWait) {
 
           for(ConsumerRecord<String, String> record: consumer.poll(pollTimeout)) {
-            messages.add(record.value());
+            Object viewOfMessage = render(record, properties);
+            messages.add(viewOfMessage);
           }
 
           // how long have we waited?
@@ -357,6 +396,7 @@ public class KafkaFunctions {
    *  KAFKA_PUT('topic', ["message1"], { "bootstrap.servers": "kafka-broker-1:6667" })
    *  }
    * </pre>
+   *
    */
   @Stellar(
           namespace = "KAFKA",
@@ -394,9 +434,49 @@ public class KafkaFunctions {
 
       // send the messages
       Properties properties = buildKafkaProperties(overrides, context);
-      putMessages(topic, messages, properties);
+      List<RecordMetadata> records = putMessages(topic, messages, properties);
 
-      return null;
+      // render a view of the messages that were written for the user
+      Object view = render(records, properties);
+      return view;
+    }
+
+    /**
+     * Render a view of the {@link RecordMetadata} that resulted from writing
+     * messages to Kafka.
+     *
+     * @param records The record metadata.
+     * @param properties The properties.
+     * @return
+     */
+    private Object render(List<RecordMetadata> records, Properties properties) {
+
+      Object view;
+      if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) {
+
+        // build a 'rich' view of the messages that were written
+        List<Object> responses = new ArrayList<>();
+        for(RecordMetadata record: records) {
+
+          // render the 'rich' view of the record
+          Map<String, Object> richView = new HashMap<>();
+          richView.put("topic", record.topic());
+          richView.put("partition", record.partition());
+          richView.put("offset", record.offset());
+          richView.put("timestamp", record.timestamp());
+
+          responses.add(richView);
+        }
+
+        // the rich view is a list of maps containing metadata about how each message was written
+        view = responses;
+
+      } else {
+
+        // otherwise, the view is simply a count of the number of messages written
+        view = CollectionUtils.size(records);
+      }
+      return view;
     }
 
     /**
@@ -407,9 +487,11 @@ public class KafkaFunctions {
      * @param topic The topic to send messages to.
      * @param messages The messages to send.
      * @param properties The properties to use with Kafka.
+     * @return Metadata about all the records written to Kafka.
      */
-    private void putMessages(String topic, List<String> messages, Properties properties) {
+    private List<RecordMetadata> putMessages(String topic, List<String> messages, Properties properties) {
       LOG.debug("KAFKA_PUT sending messages; topic={}, count={}", topic, messages.size());
+      List<RecordMetadata> records = new ArrayList<>();
       try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
 
         List<Future<RecordMetadata>> futures = new ArrayList<>();
@@ -422,11 +504,14 @@ public class KafkaFunctions {
 
         // wait for the sends to complete
         for(Future<RecordMetadata> future : futures) {
-          waitForResponse(future, properties);
+          RecordMetadata record = waitForResponse(future, properties);
+          records.add(record);
         }
 
         producer.flush();
       }
+
+      return records;
     }
 
     /**
@@ -434,19 +519,23 @@ public class KafkaFunctions {
      *
      * @param future The future for the message being sent.
      * @param properties The configuration properties.
-     * @return
+     * @return Metadata about the record that was written to Kafka.
      */
-    private void waitForResponse(Future<RecordMetadata> future, Properties properties) {
+    private RecordMetadata waitForResponse(Future<RecordMetadata> future, Properties properties) {
+      RecordMetadata record = null;
       int maxWait = getMaxWait(properties);
+
       try {
         // wait for the record and then render it for the user
-        RecordMetadata record = future.get(maxWait, TimeUnit.MILLISECONDS);
+        record = future.get(maxWait, TimeUnit.MILLISECONDS);
         LOG.debug("KAFKA_PUT message sent; topic={}, partition={}, offset={}",
                 record.topic(), record.partition(), record.offset());
 
       } catch(TimeoutException | InterruptedException | ExecutionException e) {
         LOG.error("KAFKA_PUT message send failure", e);
       }
+
+      return record;
     }
 
     @Override
@@ -528,6 +617,12 @@ public class KafkaFunctions {
    * KAFKA_FIND('topic', m -> MAP_EXISTS('geo', m), 10)
    * }
    * </pre>
+   *
+   * <p>By default, only the message value is returned. By setting the global property
+   * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+   * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+   * this property value to 'simple' or simply not setting the property value, will result
+   * in the default view behavior.
    */
   @Stellar(
           namespace = "KAFKA",
@@ -601,7 +696,8 @@ public class KafkaFunctions {
 
             // only keep the message if the filter expression is satisfied
             if(isSatisfied(filter, record.value())) {
-              messages.add(record.value());
+              Object view = render(record, properties);
+              messages.add(view);
 
               // do we have enough messages already?
               if(messages.size() >= count) {
@@ -667,6 +763,41 @@ public class KafkaFunctions {
   }
 
   /**
+   * Renders the Kafka record into a view.
+   *
+   * <p>A user can customize the way in which a Kafka record is rendered by altering
+   * the "stellar.kafka.message.view" property.
+   *
+   * @param record The Kafka record to render.
+   * @param properties The properties which allows a user to customize the rendered view of a record.
+   * @return
+   */
+  private static Object render(ConsumerRecord<String, String> record, Properties properties) {
+    LOG.debug("Render message; topic={}, partition={}, offset={}",
+            record.topic(), record.partition(), record.offset());
+
+    Object result;
+    if(MESSAGE_VIEW_RICH.equals(getMessageView(properties))) {
+      // build the detailed view of the record
+      Map<String, Object> view = new HashMap<>();
+      view.put("value", record.value());
+      view.put("topic", record.topic());
+      view.put("partition", record.partition());
+      view.put("offset", record.offset());
+      view.put("timestamp", record.timestamp());
+      view.put("key", record.key());
+
+      result = view;
+
+    } else {
+      // default to the simple view
+      result = record.value();
+    }
+
+    return result;
+  }
+
+  /**
    * Manually assigns all partitions in a topic to a consumer
    *
    * @param topic The topic whose partitions will be assigned.
@@ -756,6 +887,23 @@ public class KafkaFunctions {
   }
 
   /**
+   * Determines how Kafka messages should be rendered for the user.
+   *
+   * @param properties The properties.
+   * @return How the Kafka messages should be rendered.
+   */
+  private static String getMessageView(Properties properties) {
+    // defaults to the simple view
+    String messageView = MESSAGE_VIEW_SIMPLE;
+
+    if(properties.containsKey(MESSAGE_VIEW_PROPERTY)) {
+      messageView = ConversionUtils.convert(properties.get(MESSAGE_VIEW_PROPERTY), String.class);
+    }
+
+    return messageView;
+  }
+
+  /**
    * Defines a minimal set of default parameters that can be overridden
    * via the global properties.
    */
@@ -792,6 +940,9 @@ public class KafkaFunctions {
     // set the default poll timeout
     properties.put(POLL_TIMEOUT_PROPERTY, DEFAULT_POLL_TIMEOUT);
 
+    // set the default message view
+    properties.put(MESSAGE_VIEW_PROPERTY, MESSAGE_VIEW_SIMPLE);
+
     return properties;
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/2bf66503/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index d82bb37..5e045ad 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -48,6 +48,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -153,7 +155,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
     variables.put("topic", topicName);
 
     // put a message onto the topic
-    run("KAFKA_PUT(topic, [message1])");
+    assertEquals(1, run("KAFKA_PUT(topic, [message1])"));
+
+    // validate the message in the topic
+    assertEquals(Collections.singletonList(message1), run("KAFKA_GET(topic)"));
+  }
+
+  /**
+   * KAFKA_PUT should be able to write multiple message to a topic.
+   */
+  @Test
+  public void testKafkaPutMultipleMessages() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic
+    assertEquals(2, run("KAFKA_PUT(topic, [message1, message2])"));
+
+    // validate the message in the topic
+    List<String> expected = new ArrayList<String>() {{
+      add(message1);
+      add(message2);
+    }};
+    assertEquals(expected, run("KAFKA_GET(topic, 2)"));
+  }
+
+  /**
+   * KAFKA_PUT should be able to write a message passed as a String, rather than a List.
+   */
+  @Test
+  public void testKafkaPutOneMessagePassedAsString() {
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic - the message is just a string, not a list
+    run("KAFKA_PUT(topic, message1)");
 
     // get a message from the topic
     Object actual = run("KAFKA_GET(topic)");
@@ -166,7 +206,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
    * KAFKA_PUT should be able to write a message passed as a String, rather than a List.
    */
   @Test
-  public void testKafkaPutOneMessagePassedAsString() {
+  public void testKafkaPutWithRichView() {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put a message onto the topic - the message is just a string, not a list
+    Object actual = run("KAFKA_PUT(topic, message1)");
+
+    // validate
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(0L, view.get("offset"));
+    assertNotNull(view.get("timestamp"));
+
+  }
+
+  /**
+   * KAFKA_GET should allow a user to see a detailed view of each Kafka record.
+   */
+  @Test
+  public void testKafkaGetWithRichView() {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
 
     // use a unique topic name for this test
     final String topicName = testName.getMethodName();
@@ -179,7 +252,17 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
     Object actual = run("KAFKA_GET(topic)");
 
     // validate
-    assertEquals(Collections.singletonList(message1), actual);
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertNull(view.get("key"));
+    assertEquals(0L, view.get("offset"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message1, view.get("value"));
   }
 
   /**
@@ -300,6 +383,45 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
   }
 
   /**
+   * KAFKA_TAIL should allow a user to see a rich view of each Kafka record.
+   */
+  @Test
+  public void testKafkaTailWithRichView() throws Exception {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // put multiple messages onto the topic; KAFKA tail should NOT retrieve these
+    run("KAFKA_PUT(topic, [message2, message2, message2])");
+
+    // get a message from the topic; will block until messages arrive
+    Future<Object> tailFuture = runAsync("KAFKA_TAIL(topic, 1)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message1])"));
+
+    // wait for KAFKA_TAIL to complete
+    Object actual = tailFuture.get(10, TimeUnit.SECONDS);
+
+    // validate
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertNull(view.get("key"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message1, view.get("value"));
+    assertNotNull(view.get("offset"));
+  }
+
+  /**
    * KAFKA_PROPS should return the set of properties used to configure the Kafka consumer
    *
    * The properties used for the KAFKA_* functions are calculated by compiling the default, global and user
@@ -339,7 +461,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
     Map<String, String> properties = (Map<String, String>) run(expression);
     assertEquals(expected, properties.get(overriddenKey));
   }
-  
+
   /**
    * KAFKA_FIND should only return messages that satisfy a filter expression.
    */
@@ -385,6 +507,40 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
   }
 
   /**
+   * KAFKA_FIND should allow a user to see a detailed view of each Kafka record.
+   */
+  @Test
+  public void testKafkaFindWithRichView() throws Exception {
+
+    // configure a detailed view of each message
+    global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+    // use a unique topic name for this test
+    final String topicName = testName.getMethodName();
+    variables.put("topic", topicName);
+
+    // find all messages satisfying the filter expression
+    Future<Object> future = runAsync("KAFKA_FIND(topic, m -> MAP_GET('value', m) == 23)");
+
+    // put 10 messages onto the topic for KAFKA_TAIL to grab
+    runAsyncAndWait(Collections.nCopies(10, "KAFKA_PUT(topic, [message2])"));
+
+    // validate
+    Object actual = future.get(10, TimeUnit.SECONDS);
+    assertTrue(actual instanceof List);
+    List<Object> results = (List) actual;
+    assertEquals(1, results.size());
+
+    // expect a 'rich' view of the record
+    Map<String, Object> view = (Map) results.get(0);
+    assertNull(view.get("key"));
+    assertNotNull(view.get("offset"));
+    assertEquals(0, view.get("partition"));
+    assertEquals(topicName, view.get("topic"));
+    assertEquals(message2, view.get("value"));
+  }
+
+  /**
    * KAFKA_FIND should return no more messages than its limit.
    */
   @Test
@@ -491,4 +647,3 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
     }
   }
 }
-