You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/06/06 03:44:15 UTC

[2/8] metamodel git commit: Added ability to query by partition and/or offset

Added ability to query by partition and/or offset

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/c6be1aea
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/c6be1aea
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/c6be1aea

Branch: refs/heads/master
Commit: c6be1aea314a62fc26e1212248cf9abc9f915bca
Parents: 5b8176e
Author: Kasper Sørensen <i....@gmail.com>
Authored: Sun Jan 28 12:41:50 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Sun Jan 28 12:42:07 2018 -0800

----------------------------------------------------------------------
 kafka/pom.xml                                   |   3 +-
 .../metamodel/kafka/KafkaConsumerFactory.java   |  15 +-
 .../metamodel/kafka/KafkaDataContext.java       | 152 +++++++++++++++++-
 .../apache/metamodel/kafka/KafkaDataSet.java    |  10 +-
 .../kafka/KafkaDataContextIntegrationTest.java  | 156 +++++++++++++++++++
 pom.xml                                         |   1 +
 6 files changed, 324 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/kafka/pom.xml b/kafka/pom.xml
index 0227218..52d557a 100644
--- a/kafka/pom.xml
+++ b/kafka/pom.xml
@@ -19,6 +19,7 @@
 	<modelVersion>4.0.0</modelVersion>
 	<artifactId>MetaModel-kafka</artifactId>
 	<name>MetaModel module for Apache Kafka</name>
+
 	<dependencies>
 		<dependency>
 			<groupId>org.apache.metamodel</groupId>
@@ -34,7 +35,7 @@
 		<!-- Test dependencies -->
 		<dependency>
 			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-nop</artifactId>
+			<artifactId>slf4j-simple</artifactId>
 			<scope>test</scope>
 		</dependency>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
index 6cfd101..15e5f70 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Properties;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteBufferDeserializer;
@@ -55,10 +56,16 @@ public class KafkaConsumerFactory implements ConsumerFactory {
     public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) {
         final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis();
 
-        final Properties properties = new Properties(baseProperties);
-        properties.setProperty("group.id", groupId);
-        properties.setProperty("key.deserializer", deserializerForClass(keyClass).getName());
-        properties.setProperty("value.deserializer", deserializerForClass(keyClass).getName());
+        final Properties properties = new Properties();
+        baseProperties.stringPropertyNames().forEach(k -> {
+            properties.setProperty(k, baseProperties.getProperty(k));
+        });
+
+        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName());
+        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass)
+                .getName());
         return new KafkaConsumer<>(properties);
     }
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
index f6cd664..eda881c 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
@@ -18,8 +18,12 @@
  */
 package org.apache.metamodel.kafka;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -29,7 +33,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.QueryPostprocessDataContext;
 import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.FirstRowDataSet;
 import org.apache.metamodel.data.MaxRowsDataSet;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.ColumnType;
 import org.apache.metamodel.schema.ColumnTypeImpl;
@@ -49,6 +57,11 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
     public static final String COLUMN_KEY = "key";
     public static final String COLUMN_VALUE = "value";
 
+    private static final Set<OperatorType> OPTIMIZED_PARTITION_OPERATORS = new HashSet<>(Arrays.asList(
+            OperatorType.EQUALS_TO, OperatorType.IN));
+    private static final Set<OperatorType> OPTIMIZED_OFFSET_OPERATORS = new HashSet<>(Arrays.asList(
+            OperatorType.GREATER_THAN, OperatorType.GREATER_THAN_OR_EQUAL));
+
     private final Class<K> keyClass;
     private final Class<V> valueClass;
     private final ConsumerFactory consumerFactory;
@@ -95,17 +108,148 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
         final String topic = table.getName();
         final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
         final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-        List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> {
+        final List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> {
             return new TopicPartition(topic, partitionInfo.partition());
         }).collect(Collectors.toList());
 
-        consumer.seekToBeginning(partitions);
         consumer.assign(partitions);
+        consumer.seekToBeginning(partitions);
+
+        final List<SelectItem> selectItems = columns.stream().map(col -> new SelectItem(col)).collect(Collectors
+                .toList());
 
+        return materializeMainSchemaTableFromConsumer(consumer, selectItems, 0, maxRows);
+    }
+
+    protected DataSet materializeMainSchemaTableFromConsumer(Consumer<K, V> consumer, List<SelectItem> selectItems,
+            int offset, int maxRows) {
+        DataSet dataSet = new KafkaDataSet<K, V>(consumer, selectItems);
+        if (offset > 0) {
+            dataSet = new FirstRowDataSet(dataSet, offset);
+        }
         if (maxRows > 0) {
-            return new MaxRowsDataSet(new KafkaDataSet<K, V>(consumer, columns), maxRows);
+            dataSet = new MaxRowsDataSet(dataSet, maxRows);
+        }
+        return dataSet;
+    }
+
+    @Override
+    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems, List<FilterItem> whereItems,
+            int firstRow, int maxRows) {
+        // check if we can optimize the consumption when either "partition" or "offset"
+        // are in the where items.
+        if (!whereItems.isEmpty()) {
+            final boolean optimizable = whereItems.stream().allMatch(this::isOptimizable);
+            if (optimizable) {
+                long offset = 0;
+                List<Integer> partitions = null;
+
+                for (FilterItem whereItem : whereItems) {
+                    final OperatorType operator = whereItem.getOperator();
+                    switch (whereItem.getSelectItem().getColumn().getName()) {
+                    case COLUMN_OFFSET:
+                        if (operator == OperatorType.GREATER_THAN) {
+                            offset = toLong(whereItem.getOperand()) + 1;
+                        } else if (operator == OperatorType.GREATER_THAN_OR_EQUAL) {
+                            offset = toLong(whereItem.getOperand());
+                        } else {
+                            throw new UnsupportedOperationException();
+                        }
+                        break;
+                    case COLUMN_PARTITION:
+                        if (operator == OperatorType.EQUALS_TO) {
+                            partitions = Arrays.asList(toInt(whereItem.getOperand()));
+                        } else if (operator == OperatorType.IN) {
+                            partitions = toIntList(whereItem.getOperand());
+                        } else {
+                            throw new UnsupportedOperationException();
+                        }
+                        break;
+                    default:
+                        throw new UnsupportedOperationException();
+                    }
+                }
+
+                final String topic = table.getName();
+                final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
+
+                // handle partition filtering
+                final List<TopicPartition> assignedPartitions;
+                if (partitions == null) {
+                    final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+                    assignedPartitions = partitionInfos.stream().map(partitionInfo -> {
+                        return new TopicPartition(topic, partitionInfo.partition());
+                    }).collect(Collectors.toList());
+                } else {
+                    assignedPartitions = partitions.stream().map(partitionNumber -> {
+                        return new TopicPartition(topic, partitionNumber);
+                    }).collect(Collectors.toList());
+                }
+
+                // handle offset filtering
+                consumer.assign(assignedPartitions);
+                if (offset == 0) {
+                    consumer.seekToBeginning(assignedPartitions);
+                } else {
+                    for (TopicPartition topicPartition : assignedPartitions) {
+                        consumer.seek(topicPartition, offset);
+                    }
+                }
+
+                return materializeMainSchemaTableFromConsumer(consumer, selectItems, firstRow, maxRows);
+            }
+        }
+        return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
+    }
+
+    private static List<Integer> toIntList(Object operand) {
+        final List<Integer> list = new ArrayList<>();
+        if (operand instanceof Iterable) {
+            ((Iterable<?>) operand).forEach(o -> {
+                list.add(toInt(o));
+            });
         }
-        return new KafkaDataSet<K, V>(consumer, columns);
+        return list;
     }
 
+    private static int toInt(Object obj) {
+        if (obj instanceof Number) {
+            return ((Number) obj).intValue();
+        }
+        return Integer.parseInt(obj.toString());
+    }
+
+    private static long toLong(Object obj) {
+        if (obj instanceof Number) {
+            return ((Number) obj).longValue();
+        }
+        return Long.parseLong(obj.toString());
+    }
+
+    private boolean isOptimizable(FilterItem whereItem) {
+        if (whereItem.isCompoundFilter()) {
+            return false;
+        }
+        if (whereItem.getExpression() != null) {
+            return false;
+        }
+        final SelectItem selectItem = whereItem.getSelectItem();
+        if (selectItem.getExpression() != null || selectItem.getAggregateFunction() != null || selectItem
+                .getScalarFunction() != null) {
+            return false;
+        }
+        final Column column = selectItem.getColumn();
+        if (column == null) {
+            return false;
+        }
+
+        switch (column.getName()) {
+        case COLUMN_OFFSET:
+            return OPTIMIZED_OFFSET_OPERATORS.contains(whereItem.getOperator());
+        case COLUMN_PARTITION:
+            return OPTIMIZED_PARTITION_OPERATORS.contains(whereItem.getOperator());
+        default:
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
index 90dcf38..a214964 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataSet.java
@@ -20,7 +20,6 @@ package org.apache.metamodel.kafka;
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,7 +29,6 @@ import org.apache.metamodel.data.CachingDataSetHeader;
 import org.apache.metamodel.data.DefaultRow;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
 
 final class KafkaDataSet<K, V> extends AbstractDataSet {
 
@@ -40,12 +38,16 @@ final class KafkaDataSet<K, V> extends AbstractDataSet {
     private Iterator<ConsumerRecord<K, V>> currentIterator;
     private ConsumerRecord<K, V> currentRow;
 
-    public KafkaDataSet(Consumer<K, V> consumer, List<Column> columns) {
-        super(new CachingDataSetHeader(columns.stream().map(col -> new SelectItem(col)).collect(Collectors.toList())));
+    public KafkaDataSet(Consumer<K, V> consumer, List<SelectItem> selectItems) {
+        super(new CachingDataSetHeader(selectItems));
         this.consumer = consumer;
         this.pollTimeout = Long.parseLong(System.getProperty(KafkaDataContext.SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT,
                 "1000"));
     }
+    
+    public Consumer<K, V> getConsumer() {
+        return consumer;
+    }
 
     @Override
     public boolean next() {

http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java
new file mode 100644
index 0000000..2d0cf5e
--- /dev/null
+++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextIntegrationTest.java
@@ -0,0 +1,156 @@
+package org.apache.metamodel.kafka;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.WrappingDataSet;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaDataContextIntegrationTest {
+
+    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
+    private static final Logger logger = LoggerFactory.getLogger(KafkaDataContextIntegrationTest.class);
+
+    @Test
+    public void testGetSchemaInfo() {
+        final DataContext dataContext1 = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+                .asList("non-existing-topic"));
+
+        Assert.assertEquals("[non-existing-topic, default_table]", dataContext1.getDefaultSchema().getTableNames()
+                .toString());
+
+        final DataContext dataContext2 = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+                .asList("test1", "test2", "test3"));
+        Assert.assertEquals("[test1, test2, test3]", dataContext2.getDefaultSchema().getTableNames().toString());
+    }
+
+    @Test
+    public void testQueryNoFilters() {
+        final String topic = "test_" + UUID.randomUUID().toString().replaceAll("\\-", "");
+
+        final DataContext dataContext = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+                .asList(topic));
+
+        Assert.assertEquals("[" + topic + ", default_table]", dataContext.getDefaultSchema().getTableNames()
+                .toString());
+
+        final int numRecords = 10000;
+
+        // create a producer thread
+        new Thread(createProducerRunnable(topic, numRecords), "producer").start();
+
+        int counter = 0;
+        try (DataSet dataSet = dataContext.query().from(topic).selectAll().execute()) {
+            logger.info("c: starting");
+            while (dataSet.next()) {
+                counter++;
+                if (counter % 1000 == 0) {
+                    logger.info("c: " + counter);
+                    logger.info(dataSet.getRow().toString());
+                }
+            }
+            logger.info("c: done - " + counter);
+        }
+
+        Assert.assertEquals(numRecords, counter);
+    }
+
+    @Test
+    public void testQueryUsingOffset() throws InterruptedException {
+        final String topic = "test_" + UUID.randomUUID().toString().replaceAll("\\-", "");
+
+        final DataContext dataContext = new KafkaDataContext<>(String.class, String.class, BOOTSTRAP_SERVERS, Arrays
+                .asList(topic));
+
+        final int numRecords = 1000;
+        final int queriedOffset = 500;
+
+        // create a producer thread
+        final Thread thread = new Thread(createProducerRunnable(topic, numRecords), "producer");
+        thread.start();
+        thread.join(); // await completion so that the queried offset will exist at query time
+
+        int counter = 0;
+        try (DataSet dataSet = dataContext.query().from(topic).selectAll().where("offset").gt(queriedOffset)
+                .execute()) {
+
+            // check the assignment and position created for the consumer
+            @SuppressWarnings("resource")
+            DataSet innerDataSet = dataSet;
+            if (innerDataSet instanceof WrappingDataSet) {
+                innerDataSet = ((WrappingDataSet) dataSet).getWrappedDataSet();
+            }
+            Assert.assertTrue(innerDataSet instanceof KafkaDataSet);
+            final Consumer<?, ?> consumer = ((KafkaDataSet<?, ?>) innerDataSet).getConsumer();
+            final Set<TopicPartition> assignment = consumer.assignment();
+            for (TopicPartition assignedTopic : assignment) {
+                final long position = consumer.position(assignedTopic);
+                Assert.assertEquals(queriedOffset + 1, position);
+            }
+
+            while (dataSet.next()) {
+                counter++;
+            }
+        }
+
+        // offset is 0 based, so "greater than 500" will leave 499 records
+        Assert.assertEquals(499, counter);
+    }
+
+    private Runnable createProducerRunnable(String topic, int numRecords) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                final Properties properties = new Properties();
+                properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
+                properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+                properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+                properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "metamodel-test");
+
+                final KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
+                int counter = 0;
+                while (counter < numRecords) {
+                    final String key = UUID.randomUUID().toString();
+                    final String value = UUID.randomUUID().toString();
+                    try {
+                        producer.send(new ProducerRecord<String, String>(topic, key, value), new Callback() {
+                            @Override
+                            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                                if (exception != null) {
+                                    logger.info("Callback error");
+                                    exception.printStackTrace();
+                                }
+                            }
+                        });
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        break;
+                    }
+                    if (counter % 1000 == 0) {
+                        logger.info("p: " + counter);
+                    }
+                    counter++;
+                }
+                logger.info("p: closing - " + counter);
+                producer.flush();
+                producer.close();
+                logger.info("p: done - " + counter);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/c6be1aea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 69d52c4..ee2d087 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@ under the License.
 		<module>json</module>
 		<module>xml</module>
 		<module>jdbc</module>
+		<module>kafka</module>
 		<module>elasticsearch</module>
 		<module>hadoop</module>
 		<module>hbase</module>