You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/06/06 03:44:15 UTC
[2/8] metamodel git commit: Added ability to query by partition
and/or offset
Added ability to query by partition and/or offset
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/c6be1aea
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/c6be1aea
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/c6be1aea
Branch: refs/heads/master
Commit: c6be1aea314a62fc26e1212248cf9abc9f915bca
Parents: 5b8176e
Author: Kasper Sørensen <i....@gmail.com>
Authored: Sun Jan 28 12:41:50 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Sun Jan 28 12:42:07 2018 -0800
----------------------------------------------------------------------
kafka/pom.xml | 3 +-
.../metamodel/kafka/KafkaConsumerFactory.java | 15 +-
.../metamodel/kafka/KafkaDataContext.java | 152 +++++++++++++++++-
.../apache/metamodel/kafka/KafkaDataSet.java | 10 +-
.../kafka/KafkaDataContextIntegrationTest.java | 156 +++++++++++++++++++
pom.xml | 1 +
6 files changed, 324 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 0227218..52d557a 100644
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -19,6 +19,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>MetaModel-kafka</artifactId>
<name>MetaModel module for Apache Kafka</name>
+
<dependencies>
<dependency>
<groupId>org.apache.metamodel</groupId>
@@ -34,7 +35,7 @@
<!-- Test dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-nop</artifactId>
+ <artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
index 6cfd101..15e5f70 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteBufferDeserializer;
@@ -55,10 +56,16 @@ public class KafkaConsumerFactory implements ConsumerFactory {
public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) {
final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis();
- final Properties properties = new Properties(baseProperties);
- properties.setProperty("group.id", groupId);
- properties.setProperty("key.deserializer", deserializerForClass(keyClass).getName());
- properties.setProperty("value.deserializer", deserializerForClass(keyClass).getName());
+ final Properties properties = new Properties();
+ baseProperties.stringPropertyNames().forEach(k -> {
+ properties.setProperty(k, baseProperties.getProperty(k));
+ });
+
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass)
+ .getName());
return new KafkaConsumer<>(properties);
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
index f6cd664..eda881c 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
@@ -18,8 +18,12 @@
*/
package org.apache.metamodel.kafka;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -29,7 +33,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.FirstRowDataSet;
import org.apache.metamodel.data.MaxRowsDataSet;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.ColumnTypeImpl;
@@ -49,6 +57,11 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
public static final String COLUMN_KEY = "key";
public static final String COLUMN_VALUE = "value";
+ private static final Set<OperatorType> OPTIMIZED_PARTITION_OPERATORS = new HashSet<>(Arrays.asList(
+ OperatorType.EQUALS_TO, OperatorType.IN));
+ private static final Set<OperatorType> OPTIMIZED_OFFSET_OPERATORS = new HashSet<>(Arrays.asList(
+ OperatorType.GREATER_THAN, OperatorType.GREATER_THAN_OR_EQUAL));
+
private final Class<K> keyClass;
private final Class<V> valueClass;
private final ConsumerFactory consumerFactory;
@@ -95,17 +108,148 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
final String topic = table.getName();
final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
- List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> {
+ final List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> {
return new TopicPartition(topic, partitionInfo.partition());
}).collect(Collectors.toList());
- consumer.seekToBeginning(partitions);
consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+
+ final List<SelectItem> selectItems = columns.stream().map(col -> new SelectItem(col)).collect(Collectors
+ .toList());
+ return materializeMainSchemaTableFromConsumer(consumer, selectItems, 0, maxRows);
+ }
+
+ protected DataSet materializeMainSchemaTableFromConsumer(Consumer<K, V> consumer, List<SelectItem> selectItems,
+ int offset, int maxRows) {
+ DataSet dataSet = new KafkaDataSet<K, V>(consumer, selectItems);
+ if (offset > 0) {
+ dataSet = new FirstRowDataSet(dataSet, offset);
+ }
if (maxRows > 0) {
- return new MaxRowsDataSet(new KafkaDataSet<K, V>(consumer, columns), maxRows);
+ dataSet = new MaxRowsDataSet(dataSet, maxRows);
+ }
+ return dataSet;
+ }
+
+ @Override
+ protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, List<FilterItem> whereItems,
+ int firstRow, int maxRows) {
+ // check if we can optimize the consumption when either "partition" or "offset"
+ // are in the where items.
+ if (!whereItems.isEmpty()) {
+ final boolean optimizable = whereItems.stream().allMatch(this::isOptimizable);
+ if (optimizable) {
+ long offset = 0;
+ List<Integer> partitions = null;
+
+ for (FilterItem whereItem : whereItems) {
+ final OperatorType operator = whereItem.getOperator();
+ switch (whereItem.getSelectItem().getColumn().getName()) {
+ case COLUMN_OFFSET:
+ if (operator == OperatorType.GREATER_THAN) {
+ offset = toLong(whereItem.getOperand()) + 1;
+ } else if (operator == OperatorType.GREATER_THAN_OR_EQUAL) {
+ offset = toLong(whereItem.getOperand());
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ break;
+ case COLUMN_PARTITION:
+ if (operator == OperatorType.EQUALS_TO) {
+ partitions = Arrays.asList(toInt(whereItem.getOperand()));
+ } else if (operator == OperatorType.IN) {
+ partitions = toIntList(whereItem.getOperand());
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ final String topic = table.getName();
+ final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
+
+ // handle partition filtering
+ final List<TopicPartition> assignedPartitions;
+ if (partitions == null) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ assignedPartitions = partitionInfos.stream().map(partitionInfo -> {
+ return new TopicPartition(topic, partitionInfo.partition());
+ }).collect(Collectors.toList());
+ } else {
+ assignedPartitions = partitions.stream().map(partitionNumber -> {
+ return new TopicPartition(topic, partitionNumber);
+ }).collect(Collectors.toList());
+ }
+
+ // handle offset filtering
+ consumer.assign(assignedPartitions);
+ if (offset == 0) {
+ consumer.seekToBeginning(assignedPartitions);
+ } else {
+ for (TopicPartition topicPartition : assignedPartitions) {
+ consumer.seek(topicPartition, offset);
+ }
+ }
+
+ return materializeMainSchemaTableFromConsumer(consumer, selectItems, firstRow, maxRows);
+ }
+ }
+ return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
+ }
+
+ private static List<Integer> toIntList(Object operand) {
+ final List<Integer> list = new ArrayList<>();
+ if (operand instanceof Iterable) {
+ ((Iterable<?>) operand).forEach(o -> {
+ list.add(toInt(o));
+ });
}
- return new KafkaDataSet<K, V>(consumer, columns);
+ return list;
}
+ private static int toInt(Object obj) {
+ if (obj instanceof Number) {
+ return ((Number) obj).intValue();
+ }
+ return Integer.parseInt(obj.toString());
+ }
+
+ private static long toLong(Object obj) {
+ if (obj instanceof Number) {
+ return ((Number) obj).longValue();
+ }
+ return Long.parseLong(obj.toString());
+ }
+
+ private boolean isOptimizable(FilterItem whereItem) {
+ if (whereItem.isCompoundFilter()) {
+ return false;
+ }
+ if (whereItem.getExpression() != null) {
+ return false;
+ }
+ final SelectItem selectItem = whereItem.getSelectItem();
+ if (selectItem.getExpression() != null || selectItem.getAggregateFunction() != null || selectItem
+ .getScalarFunction() != null) {
+ return false;
+ }
+ final Column column = selectItem.getColumn();
+ if (column == null) {
+ return false;
+ }
+
+ switch (column.getName()) {
+ case COLUMN_OFFSET:
+ return OPTIMIZED_OFFSET_OPERATORS.contains(whereItem.getOperator());
+ case COLUMN_PARTITION:
+ return OPTIMIZED_PARTITION_OPERATORS.contains(whereItem.getOperator());
+ default:
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
index 90dcf38..a214964 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
@@ -20,7 +20,6 @@ package org.apache.metamodel.kafka;
import java.util.Iterator;
import java.util.List;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,7 +29,6 @@ import org.apache.metamodel.data.CachingDataSetHeader;
import org.apache.metamodel.data.DefaultRow;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
final class KafkaDataSet<K, V> extends AbstractDataSet {
@@ -40,12 +38,16 @@ final class KafkaDataSet<K, V> extends AbstractDataSet {
private Iterator<ConsumerRecord<K, V>> currentIterator;
private ConsumerRecord<K, V> currentRow;
- public KafkaDataSet(Consumer<K, V> consumer, List<Column> columns) {
- super(new CachingDataSetHeader(columns.stream().map(col -> new SelectItem(col)).collect(Collectors.toList())));
+ public KafkaDataSet(Consumer<K, V> consumer, List<SelectItem> selectItems) {
+ super(new CachingDataSetHeader(selectItems));
this.consumer = consumer;
this.pollTimeout = Long.parseLong(System.getProperty(KafkaDataContext.SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT,
"1000"));
}
+
+ public Consumer<K, V> getConsumer() {
+ return consumer;
+ }
@Override
public boolean next() {
http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java
new file mode 100644
index 0000000..2d0cf5e
--- /dev/null
+++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java
@@ -0,0 +1,156 @@
+package org.apache.metamodel.kafka;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.WrappingDataSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaDataContextIntegrationTest {
+
+ private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+ private static final Logger logger = LoggerFactory.getLogger(KafkaDataContextIntegrationTest.class);
+
+ @Test
+ public void testGetSchemaInfo() {
+ final DataContext dataContext1 = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+ .asList("non-existing-topic"));
+
+ Assert.assertEquals("[non-existing-topic, default_table]", dataContext1.getDefaultSchema().getTableNames()
+ .toString());
+
+ final DataContext dataContext2 = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+ .asList("test1", "test2", "test3"));
+ Assert.assertEquals("[test1, test2, test3]", dataContext2.getDefaultSchema().getTableNames().toString());
+ }
+
+ @Test
+ public void testQueryNoFilters() {
+ final String topic = "test_" + UUID.randomUUID().toString().replaceAll("\\-", "");
+
+ final DataContext dataContext = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+ .asList(topic));
+
+ Assert.assertEquals("[" + topic + ", default_table]", dataContext.getDefaultSchema().getTableNames()
+ .toString());
+
+ final int numRecords = 10000;
+
+ // create a producer thread
+ new Thread(createProducerRunnable(topic, numRecords), "producer").start();
+
+ int counter = 0;
+ try (DataSet dataSet = dataContext.query().from(topic).selectAll().execute()) {
+ logger.info("c: starting");
+ while (dataSet.next()) {
+ counter++;
+ if (counter % 1000 == 0) {
+ logger.info("c: " + counter);
+ logger.info(dataSet.getRow().toString());
+ }
+ }
+ logger.info("c: done - " + counter);
+ }
+
+ Assert.assertEquals(numRecords, counter);
+ }
+
+ @Test
+ public void testQueryUsingOffset() throws InterruptedException {
+ final String topic = "test_" + UUID.randomUUID().toString().replaceAll("\\-", "");
+
+ final DataContext dataContext = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+ .asList(topic));
+
+ final int numRecords = 1000;
+ final int queriedOffset = 500;
+
+ // create a producer thread
+ final Thread thread = new Thread(createProducerRunnable(topic, numRecords), "producer");
+ thread.start();
+ thread.join(); // await completion so that the queried offset will exist at query time
+
+ int counter = 0;
+ try (DataSet dataSet = dataContext.query().from(topic).selectAll().where("offset").gt(queriedOffset)
+ .execute()) {
+
+ // check the assignment and position created for the consumer
+ @SuppressWarnings("resource")
+ DataSet innerDataSet = dataSet;
+ if (innerDataSet instanceof WrappingDataSet) {
+ innerDataSet = ((WrappingDataSet) dataSet).getWrappedDataSet();
+ }
+ Assert.assertTrue(innerDataSet instanceof KafkaDataSet);
+ final Consumer<?, ?> consumer = ((KafkaDataSet<?, ?>) innerDataSet).getConsumer();
+ final Set<TopicPartition> assignment = consumer.assignment();
+ for (TopicPartition assignedTopic : assignment) {
+ final long position = consumer.position(assignedTopic);
+ Assert.assertEquals(queriedOffset + 1, position);
+ }
+
+ while (dataSet.next()) {
+ counter++;
+ }
+ }
+
+ // offset is 0 based, so "greater than 500" will leave 499 records
+ Assert.assertEquals(499, counter);
+ }
+
+ private Runnable createProducerRunnable(String topic, int numRecords) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ final Properties properties = new Properties();
+ properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "metamodel-test");
+
+ final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
+ int counter = 0;
+ while (counter < numRecords) {
+ final String key = UUID.randomUUID().toString();
+ final String value = UUID.randomUUID().toString();
+ try {
+ producer.send(new ProducerRecord<String, String>(topic, key, value), new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ logger.info("Callback error");
+ exception.printStackTrace();
+ }
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ break;
+ }
+ if (counter % 1000 == 0) {
+ logger.info("p: " + counter);
+ }
+ counter++;
+ }
+ logger.info("p: closing - " + counter);
+ producer.flush();
+ producer.close();
+ logger.info("p: done - " + counter);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 69d52c4..ee2d087 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@ under the License.
<module>json</module>
<module>xml</module>
<module>jdbc</module>
+ <module>kafka</module>
<module>elasticsearch</module>
<module>hadoop</module>
<module>hbase</module>