You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2018/07/11 15:10:26 UTC
metron git commit: METRON-1656 Create KAKFA_SEEK function
(nickwallen) closes apache/metron#1097
Repository: metron
Updated Branches:
refs/heads/master cbdaee174 -> 7af11b626
METRON-1656 Create KAKFA_SEEK function (nickwallen) closes apache/metron#1097
Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/7af11b62
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/7af11b62
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/7af11b62
Branch: refs/heads/master
Commit: 7af11b626e2d30aaaf7c01c364295b0b407fae49
Parents: cbdaee1
Author: nickwallen <ni...@nickallen.org>
Authored: Wed Jul 11 11:10:06 2018 -0400
Committer: nickallen <ni...@apache.org>
Committed: Wed Jul 11 11:10:06 2018 -0400
----------------------------------------------------------------------
.../metron/management/KafkaFunctions.java | 109 +++++++++++++++++++
.../KafkaFunctionsIntegrationTest.java | 92 ++++++++++++++++
2 files changed, 201 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metron/blob/7af11b62/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index 7c9c23f..76418b6 100644
--- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -763,6 +764,114 @@ public class KafkaFunctions {
}
/**
+ * KAFKA_SEEK
+ *
+ * <p>Seeks to a specific offset and returns the message.
+ *
+ * <p>Example: Find the message in 'topic', partition 2, offset 1001.
+ * <pre>
+ * {@code
+ * KAFKA_SEEK('topic', 1, 1001)
+ * }
+ * </pre>
+ *
+ * <p>By default, only the message value is returned. By setting the global property
+ * 'stellar.kafka.message.view' = 'rich' the function will return additional Kafka metadata
+ * including the topic, partition, offset, key, and timestamp contained in a map. Setting
+ * this property value to 'simple' or simply not setting the property value, will result
+ * in the default view behavior.
+ */
+ @Stellar(
+ namespace = "KAFKA",
+ name = "SEEK",
+ description = "Seeks to an offset within a topic and returns the message.",
+ params = {
+ "topic - The name of the Kafka topic",
+ "partition - The partition identifier; starts at 0.",
+ "offset - The offset within the partition; starts at 0.",
+ "config - Optional map of key/values that override any global properties."
+ },
+ returns = "The message at the given offset, if the offset exists. Otherwise, returns null."
+ )
+ public static class KafkaSeek implements StellarFunction {
+
+ @Override
+ public Object apply(List<Object> args, Context context) throws ParseException {
+ // required - the topic, partition, and offset are all required
+ String topic = getArg("topic", 0, String.class, args);
+ int partition = getArg("partition", 1, Integer.class, args);
+ int offset = getArg("offset", 2, Integer.class, args);
+
+ // optional - property overrides provided by the user
+ Map<String, String> overrides = new HashMap<>();
+ if(args.size() > 3) {
+ overrides = getArg("overrides", 3, Map.class, args);
+ }
+
+ Properties properties = buildKafkaProperties(overrides, context);
+ return seek(topic, partition, offset, properties);
+ }
+
+ /**
+ * Find messages in Kafka that satisfy a filter expression.
+ *
+ * @param topic The kafka topic.
+ * @param partition The partition identifier.
+ * @param offset The offset within the given partition.
+ * @param properties Function configuration values.
+ * @return A list of messages that satisfy the filter expression.
+ */
+ private Object seek(String topic, int partition, int offset, Properties properties) {
+ final int pollTimeout = getPollTimeout(properties);
+ final int maxWait = getMaxWait(properties);
+ Object message = null;
+ try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
+
+ // continue until we have the message or exceeded the max wait time
+ long wait = 0L;
+ final long start = clock.currentTimeMillis();
+ while(message == null && wait < maxWait) {
+
+ // seek to the offset
+ TopicPartition topar = new TopicPartition(topic, partition);
+ consumer.assign(Collections.singletonList(topar));
+ consumer.seek(topar, offset);
+
+ // poll kafka for messages
+ for(ConsumerRecord<String, String> record : consumer.poll(pollTimeout)) {
+
+ // kafka will attempt to be helpful and return a message, even if the actual offset does not exist
+ if(record.offset() == offset && record.partition() == partition) {
+ LOG.debug("KAFKA_SEEK found message; topic={}, partition={}, offset={}", topic, partition, offset);
+ message = render(record, properties);
+ }
+ }
+
+ // how long have we waited?
+ wait = clock.currentTimeMillis() - start;
+ if(LOG.isDebugEnabled() && message == null) {
+ LOG.debug("KAFKA_SEEK no message yet; topic={}, partition={}, offset={}, waitTime={} ms",
+ topic, partition, offset, wait);
+ }
+ }
+ }
+
+ return message;
+ }
+
+ @Override
+ public void initialize(Context context) {
+ // no initialization required
+ }
+
+ @Override
+ public boolean isInitialized() {
+ // no initialization required
+ return true;
+ }
+ }
+
+ /**
* Renders the Kafka record into a view.
*
* <p>A user can customize the way in which a Kafka record is rendered by altering
http://git-wip-us.apache.org/repos/asf/metron/blob/7af11b62/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
index 5e045ad..09bce17 100644
--- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
+++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/KafkaFunctionsIntegrationTest.java
@@ -113,6 +113,7 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
.withClass(KafkaFunctions.KafkaProps.class)
.withClass(KafkaFunctions.KafkaTail.class)
.withClass(KafkaFunctions.KafkaFind.class)
+ .withClass(KafkaFunctions.KafkaSeek.class)
.withClass(MapFunctions.MapGet.class);
}
@@ -593,6 +594,97 @@ public class KafkaFunctionsIntegrationTest extends BaseIntegrationTest {
}
/**
+ * KAFKA_SEEK should return the message at a given offset.
+ */
+ @Test
+ public void testKafkaSeek() throws Exception {
+
+ // use a unique topic name for this test
+ final String topicName = testName.getMethodName();
+ variables.put("topic", topicName);
+
+ // put 3 messages into the topic
+ run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+ {
+ // get the 3rd message from the topic
+ Object actual = run("KAFKA_SEEK(topic, 0, 2)");
+ assertEquals(message3, actual);
+ }
+ {
+ // get the 2nd message from the topic
+ Object actual = run("KAFKA_SEEK(topic, 0, 1)");
+ assertEquals(message2, actual);
+ }
+ {
+ // get the 1st message from the topic
+ Object actual = run("KAFKA_SEEK(topic, 0, 0)");
+ assertEquals(message1, actual);
+ }
+ }
+
+ /**
+ * KAFKA_SEEK should return null if the offset does not exist
+ */
+ @Test
+ public void testKafkaSeekToMissingOffset() throws Exception {
+
+ // use a unique topic name for this test
+ final String topicName = testName.getMethodName();
+ variables.put("topic", topicName);
+
+ // put 3 messages into the topic
+ run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+
+ // get the 3rd message from the topic
+ Object actual = run("KAFKA_SEEK(topic, 0, 9999)");
+ assertNull(actual);
+ }
+
+ /**
+ * KAFKA_SEEK should return null if the partition does not exist
+ */
+ @Test
+ public void testKafkaSeekToMissingPartition() throws Exception {
+
+ // use a unique topic name for this test
+ final String topicName = testName.getMethodName();
+ variables.put("topic", topicName);
+
+ // put 3 messages into the topic
+ run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+
+ // get the 3rd message from the topic
+ Object actual = run("KAFKA_SEEK(topic, 99999, 0)");
+ assertNull(actual);
+ }
+
+ /**
+ * KAFKA_SEEK should allow a user to see a detailed view of each Kafka record.
+ */
+ @Test
+ public void testKafkaSeekWithRichView() throws Exception {
+
+ // configure a detailed view of each message
+ global.put(KafkaFunctions.MESSAGE_VIEW_PROPERTY, KafkaFunctions.MESSAGE_VIEW_RICH);
+
+ // use a unique topic name for this test
+ final String topicName = testName.getMethodName();
+ variables.put("topic", topicName);
+
+ run("KAFKA_PUT(topic, [ message1, message2, message3 ])");
+ Object actual = run("KAFKA_SEEK(topic, 0, 0)");
+
+ // expect a 'rich' view of the record
+ assertTrue(actual instanceof Map);
+ Map<String, Object> view = (Map) actual;
+ assertNull(view.get("key"));
+ assertNotNull(view.get("offset"));
+ assertEquals(0, view.get("partition"));
+ assertEquals(topicName, view.get("topic"));
+ assertEquals(message1, view.get("value"));
+ }
+
+ /**
* Runs a Stellar expression.
* @param expression The expression to run.
*/