You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/06/06 03:44:18 UTC

[5/8] metamodel git commit: Added (insert-only) UpdateableDataContext capabilities

Added (insert-only) UpdateableDataContext capabilities

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/0e7545e7
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/0e7545e7
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/0e7545e7

Branch: refs/heads/master
Commit: 0e7545e74d80985c2b90fff6288b90f8e509ea40
Parents: 9f4d15e
Author: Kasper Sørensen <i....@gmail.com>
Authored: Sun Jan 28 15:50:04 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Sun Jan 28 15:50:38 2018 -0800

----------------------------------------------------------------------
 .../kafka/ConsumerAndProducerFactory.java       |  35 ++++
 .../apache/metamodel/kafka/ConsumerFactory.java |  32 ----
 .../kafka/KafkaConsumerAndProducerFactory.java  | 162 +++++++++++++++++++
 .../metamodel/kafka/KafkaConsumerFactory.java   | 104 ------------
 .../metamodel/kafka/KafkaDataContext.java       |  34 +++-
 .../metamodel/kafka/KafkaInsertBuilder.java     |  54 +++++++
 .../metamodel/kafka/KafkaUpdateCallback.java    |  85 ++++++++++
 .../metamodel/kafka/KafkaDataContextTest.java   |   6 +-
 8 files changed, 365 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java
new file mode 100644
index 0000000..b3c58ea
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+
+/**
+ * Factory interface for Kafka {@link Consumer} and {@link Producer} objects to
+ * be used by Apache MetaModel. Since Apache MetaModel may potentially serve
+ * multiple queries at the same times, multiple consumers may be needed. This
+ * class determines how these are instantiated.
+ */
+public interface ConsumerAndProducerFactory {
+
+    public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass);
+
+    public <K, V> Producer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass);
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
deleted file mode 100644
index efabf38..0000000
--- a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-
-/**
- * Factory interface for Kafka {@link Consumer} objects to be used by Apache
- * MetaModel. Since Apache MetaModel may potentially serve multiple queries at
- * the same times, multiple consumers may be needed. This class determines how
- * these are instantiated.
- */
-public interface ConsumerFactory {
-
-    public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass);
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java
new file mode 100644
index 0000000..c0d4b8f
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.FloatDeserializer;
+import org.apache.kafka.common.serialization.FloatSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.ShortDeserializer;
+import org.apache.kafka.common.serialization.ShortSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * Default {@link ConsumerAndProducerFactory} implementation.
+ */
+public class KafkaConsumerAndProducerFactory implements ConsumerAndProducerFactory {
+
+    private final Properties baseProperties;
+
+    public KafkaConsumerAndProducerFactory(String bootstrapServers) {
+        this.baseProperties = new Properties();
+        this.baseProperties.setProperty("bootstrap.servers", bootstrapServers);
+    }
+
+    public KafkaConsumerAndProducerFactory(Properties baseProperties) {
+        this.baseProperties = baseProperties;
+    }
+
+    @Override
+    public <K, V> Producer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass) {
+        final Properties properties = new Properties();
+        baseProperties.stringPropertyNames().forEach(k -> {
+            properties.setProperty(k, baseProperties.getProperty(k));
+        });
+
+        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerForClass(keyClass).getName());
+        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerForClass(keyClass).getName());
+
+        return new KafkaProducer<>(baseProperties);
+    }
+
+    @Override
+    public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) {
+        final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis();
+
+        final Properties properties = new Properties();
+        baseProperties.stringPropertyNames().forEach(k -> {
+            properties.setProperty(k, baseProperties.getProperty(k));
+        });
+
+        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName());
+        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass)
+                .getName());
+        return new KafkaConsumer<>(properties);
+    }
+
+    private static Class<? extends Serializer<?>> serializerForClass(Class<?> cls) {
+        if (cls == String.class || cls == CharSequence.class) {
+            return StringSerializer.class;
+        }
+        if (cls == Double.class) {
+            return DoubleSerializer.class;
+        }
+        if (cls == Integer.class) {
+            return IntegerSerializer.class;
+        }
+        if (cls == Float.class) {
+            return FloatSerializer.class;
+        }
+        if (cls == Long.class) {
+            return LongSerializer.class;
+        }
+        if (cls == Short.class) {
+            return ShortSerializer.class;
+        }
+        if (cls == Bytes.class) {
+            return BytesSerializer.class;
+        }
+        if (cls == ByteBuffer.class) {
+            return ByteBufferSerializer.class;
+        }
+        if (cls == byte[].class || cls == Byte[].class || cls == Object.class) {
+            return ByteArraySerializer.class;
+        }
+        // fall back to doing nothing (byte[])
+        return ByteArraySerializer.class;
+    }
+
+    private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) {
+        if (cls == String.class || cls == CharSequence.class) {
+            return StringDeserializer.class;
+        }
+        if (cls == Double.class) {
+            return DoubleDeserializer.class;
+        }
+        if (cls == Integer.class) {
+            return IntegerDeserializer.class;
+        }
+        if (cls == Float.class) {
+            return FloatDeserializer.class;
+        }
+        if (cls == Long.class) {
+            return LongDeserializer.class;
+        }
+        if (cls == Short.class) {
+            return ShortDeserializer.class;
+        }
+        if (cls == Bytes.class) {
+            return BytesDeserializer.class;
+        }
+        if (cls == ByteBuffer.class) {
+            return ByteBufferDeserializer.class;
+        }
+        if (cls == byte[].class || cls == Byte[].class || cls == Object.class) {
+            return ByteArrayDeserializer.class;
+        }
+        // fall back to doing nothing (byte[])
+        return ByteArrayDeserializer.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
deleted file mode 100644
index 15e5f70..0000000
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Properties;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteBufferDeserializer;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.DoubleDeserializer;
-import org.apache.kafka.common.serialization.FloatDeserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.ShortDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Bytes;
-
-/**
- * Default {@link ConsumerFactory} implementation.
- */
-public class KafkaConsumerFactory implements ConsumerFactory {
-
-    private final Properties baseProperties;
-
-    public KafkaConsumerFactory(String bootstrapServers) {
-        this.baseProperties = new Properties();
-        this.baseProperties.setProperty("bootstrap.servers", bootstrapServers);
-    }
-
-    public KafkaConsumerFactory(Properties baseProperties) {
-        this.baseProperties = baseProperties;
-    }
-
-    @Override
-    public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) {
-        final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis();
-
-        final Properties properties = new Properties();
-        baseProperties.stringPropertyNames().forEach(k -> {
-            properties.setProperty(k, baseProperties.getProperty(k));
-        });
-
-        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName());
-        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass)
-                .getName());
-        return new KafkaConsumer<>(properties);
-    }
-
-    private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) {
-        if (cls == String.class || cls == CharSequence.class) {
-            return StringDeserializer.class;
-        }
-        if (cls == Double.class) {
-            return DoubleDeserializer.class;
-        }
-        if (cls == Integer.class) {
-            return IntegerDeserializer.class;
-        }
-        if (cls == Float.class) {
-            return FloatDeserializer.class;
-        }
-        if (cls == Long.class) {
-            return LongDeserializer.class;
-        }
-        if (cls == Short.class) {
-            return ShortDeserializer.class;
-        }
-        if (cls == Bytes.class) {
-            return BytesDeserializer.class;
-        }
-        if (cls == ByteBuffer.class) {
-            return ByteBufferDeserializer.class;
-        }
-        if (cls == byte[].class || cls == Byte[].class || cls == Object.class) {
-            return ByteArrayDeserializer.class;
-        }
-        // fall back to doing nothing
-        return ByteArrayDeserializer.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
index 2fa8c06..8f4d7a7 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
@@ -28,10 +28,14 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateSummary;
+import org.apache.metamodel.UpdateableDataContext;
 import org.apache.metamodel.data.DataSet;
 import org.apache.metamodel.data.FirstRowDataSet;
 import org.apache.metamodel.data.MaxRowsDataSet;
@@ -47,7 +51,7 @@ import org.apache.metamodel.schema.MutableTable;
 import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 
-public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
+public class KafkaDataContext<K, V> extends QueryPostprocessDataContext implements UpdateableDataContext {
 
     public static final String SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT = "metamodel.kafka.consumer.poll.timeout";
 
@@ -64,19 +68,19 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
 
     private final Class<K> keyClass;
     private final Class<V> valueClass;
-    private final ConsumerFactory consumerFactory;
+    private final ConsumerAndProducerFactory consumerAndProducerFactory;
     private final Supplier<Collection<String>> topicSupplier;
 
     public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, String bootstrapServers,
             Collection<String> topics) {
-        this(keyClass, valueClass, new KafkaConsumerFactory(bootstrapServers), () -> topics);
+        this(keyClass, valueClass, new KafkaConsumerAndProducerFactory(bootstrapServers), () -> topics);
     }
 
-    public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, ConsumerFactory consumerFactory,
-            Supplier<Collection<String>> topicSupplier) {
+    public KafkaDataContext(Class<K> keyClass, Class<V> valueClass,
+            ConsumerAndProducerFactory consumerAndProducerFactory, Supplier<Collection<String>> topicSupplier) {
         this.keyClass = keyClass;
         this.valueClass = valueClass;
-        this.consumerFactory = consumerFactory;
+        this.consumerAndProducerFactory = consumerAndProducerFactory;
         this.topicSupplier = topicSupplier;
     }
 
@@ -106,7 +110,7 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
     @Override
     protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
         final String topic = table.getName();
-        final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
+        final Consumer<K, V> consumer = consumerAndProducerFactory.createConsumer(topic, keyClass, valueClass);
         final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
         final List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> {
             return new TopicPartition(topic, partitionInfo.partition());
@@ -171,7 +175,7 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
                 }
 
                 final String topic = table.getName();
-                final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
+                final Consumer<K, V> consumer = consumerAndProducerFactory.createConsumer(topic, keyClass, valueClass);
 
                 // handle partition filtering
                 final List<TopicPartition> assignedPartitions;
@@ -258,4 +262,18 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
             return false;
         }
     }
+
+    @Override
+    public UpdateSummary executeUpdate(UpdateScript update) {
+        final Producer<K, V> producer = consumerAndProducerFactory.createProducer(keyClass, valueClass);
+        final KafkaUpdateCallback<K, V> callback = new KafkaUpdateCallback<>(this, producer);
+        try {
+            update.run(callback);
+        } finally {
+            callback.flush();
+        }
+        final UpdateSummary updateSummary = callback.getUpdateSummary();
+        callback.close();
+        return updateSummary;
+    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java
new file mode 100644
index 0000000..4253bce
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+
+final class KafkaInsertBuilder<K, V> extends AbstractRowInsertionBuilder<KafkaUpdateCallback<K, V>> {
+
+    public KafkaInsertBuilder(KafkaUpdateCallback<K, V> updateCallback, Table table) {
+        super(updateCallback, table);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void execute() throws MetaModelException {
+        final Column[] columns = getColumns();
+        final Object[] values = getValues();
+
+        K key = null;
+        V value = null;
+
+        for (int i = 0; i < columns.length; i++) {
+            if (columns[i].getName() == KafkaDataContext.COLUMN_KEY) {
+                key = (K) values[i];
+            }
+            if (columns[i].getName() == KafkaDataContext.COLUMN_VALUE) {
+                value = (V) values[i];
+            }
+        }
+
+        getUpdateCallback().getProducer().send(new ProducerRecord<K, V>(getTable().getName(), key, value));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java
new file mode 100644
index 0000000..a7b6e96
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+
+final class KafkaUpdateCallback<K, V> extends AbstractUpdateCallback {
+
+    private final Producer<K, V> producer;
+
+    public KafkaUpdateCallback(KafkaDataContext<K, V> kafkaDataContext, Producer<K, V> producer) {
+        super(kafkaDataContext);
+        this.producer = producer;
+    }
+
+    @Override
+    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
+            IllegalStateException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return false;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new KafkaInsertBuilder<>(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return false;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
+    
+    public Producer<K, V> getProducer() {
+        return producer;
+    }
+
+    public void flush() {
+        producer.flush();
+    }
+
+    public void close() {
+        producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
index 4b19940..491c96d 100644
--- a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
+++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
@@ -25,7 +25,7 @@ public class KafkaDataContextTest extends EasyMockSupport {
 
     @Test
     public void testGetSchemaInfo() {
-        final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class);
+        final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class);
 
         replayAll();
 
@@ -39,7 +39,7 @@ public class KafkaDataContextTest extends EasyMockSupport {
 
     @Test
     public void testQueryWithoutOptimization() {
-        final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class);
+        final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class);
         @SuppressWarnings("unchecked")
         final Consumer<String, String> consumer = createMock(Consumer.class);
 
@@ -94,7 +94,7 @@ public class KafkaDataContextTest extends EasyMockSupport {
 
     @Test
     public void testQueryOptimizationByPartition() {
-        final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class);
+        final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class);
         @SuppressWarnings("unchecked")
         final Consumer<String, String> consumer = createMock(Consumer.class);