You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/06/06 03:44:18 UTC
[5/8] metamodel git commit: Added (insert-only) UpdateableDataContext
capabilities
Added (insert-only) UpdateableDataContext capabilities
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/0e7545e7
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/0e7545e7
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/0e7545e7
Branch: refs/heads/master
Commit: 0e7545e74d80985c2b90fff6288b90f8e509ea40
Parents: 9f4d15e
Author: Kasper Sørensen <i....@gmail.com>
Authored: Sun Jan 28 15:50:04 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Sun Jan 28 15:50:38 2018 -0800
----------------------------------------------------------------------
.../kafka/ConsumerAndProducerFactory.java | 35 ++++
.../apache/metamodel/kafka/ConsumerFactory.java | 32 ----
.../kafka/KafkaConsumerAndProducerFactory.java | 162 +++++++++++++++++++
.../metamodel/kafka/KafkaConsumerFactory.java | 104 ------------
.../metamodel/kafka/KafkaDataContext.java | 34 +++-
.../metamodel/kafka/KafkaInsertBuilder.java | 54 +++++++
.../metamodel/kafka/KafkaUpdateCallback.java | 85 ++++++++++
.../metamodel/kafka/KafkaDataContextTest.java | 6 +-
8 files changed, 365 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java
new file mode 100644
index 0000000..b3c58ea
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerAndProducerFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+
+/**
+ * Factory interface for Kafka {@link Consumer} and {@link Producer} objects to
+ * be used by Apache MetaModel. Since Apache MetaModel may potentially serve
+ * multiple queries at the same times, multiple consumers may be needed. This
+ * class determines how these are instantiated.
+ */
+public interface ConsumerAndProducerFactory {
+
+ public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass);
+
+ public <K, V> Producer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass);
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
deleted file mode 100644
index efabf38..0000000
--- a/kafka/src/main/java/org/apache/metamodel/kafka/ConsumerFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.kafka;
-
-import org.apache.kafka.clients.consumer.Consumer;
-
-/**
- * Factory interface for Kafka {@link Consumer} objects to be used by Apache
- * MetaModel. Since Apache MetaModel may potentially serve multiple queries at
- * the same times, multiple consumers may be needed. This class determines how
- * these are instantiated.
- */
-public interface ConsumerFactory {
-
- public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass);
-}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java
new file mode 100644
index 0000000..c0d4b8f
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerAndProducerFactory.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.FloatDeserializer;
+import org.apache.kafka.common.serialization.FloatSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.ShortDeserializer;
+import org.apache.kafka.common.serialization.ShortSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * Default {@link ConsumerAndProducerFactory} implementation.
+ */
+public class KafkaConsumerAndProducerFactory implements ConsumerAndProducerFactory {
+
+ private final Properties baseProperties;
+
+ public KafkaConsumerAndProducerFactory(String bootstrapServers) {
+ this.baseProperties = new Properties();
+ this.baseProperties.setProperty("bootstrap.servers", bootstrapServers);
+ }
+
+ public KafkaConsumerAndProducerFactory(Properties baseProperties) {
+ this.baseProperties = baseProperties;
+ }
+
+ @Override
+ public <K, V> Producer<K, V> createProducer(Class<K> keyClass, Class<V> valueClass) {
+ final Properties properties = new Properties();
+ baseProperties.stringPropertyNames().forEach(k -> {
+ properties.setProperty(k, baseProperties.getProperty(k));
+ });
+
+ properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializerForClass(keyClass).getName());
+ properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializerForClass(keyClass).getName());
+
+ return new KafkaProducer<>(baseProperties);
+ }
+
+ @Override
+ public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) {
+ final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis();
+
+ final Properties properties = new Properties();
+ baseProperties.stringPropertyNames().forEach(k -> {
+ properties.setProperty(k, baseProperties.getProperty(k));
+ });
+
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass)
+ .getName());
+ return new KafkaConsumer<>(properties);
+ }
+
+ private static Class<? extends Serializer<?>> serializerForClass(Class<?> cls) {
+ if (cls == String.class || cls == CharSequence.class) {
+ return StringSerializer.class;
+ }
+ if (cls == Double.class) {
+ return DoubleSerializer.class;
+ }
+ if (cls == Integer.class) {
+ return IntegerSerializer.class;
+ }
+ if (cls == Float.class) {
+ return FloatSerializer.class;
+ }
+ if (cls == Long.class) {
+ return LongSerializer.class;
+ }
+ if (cls == Short.class) {
+ return ShortSerializer.class;
+ }
+ if (cls == Bytes.class) {
+ return BytesSerializer.class;
+ }
+ if (cls == ByteBuffer.class) {
+ return ByteBufferSerializer.class;
+ }
+ if (cls == byte[].class || cls == Byte[].class || cls == Object.class) {
+ return ByteArraySerializer.class;
+ }
+ // fall back to doing nothing (byte[])
+ return ByteArraySerializer.class;
+ }
+
+ private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) {
+ if (cls == String.class || cls == CharSequence.class) {
+ return StringDeserializer.class;
+ }
+ if (cls == Double.class) {
+ return DoubleDeserializer.class;
+ }
+ if (cls == Integer.class) {
+ return IntegerDeserializer.class;
+ }
+ if (cls == Float.class) {
+ return FloatDeserializer.class;
+ }
+ if (cls == Long.class) {
+ return LongDeserializer.class;
+ }
+ if (cls == Short.class) {
+ return ShortDeserializer.class;
+ }
+ if (cls == Bytes.class) {
+ return BytesDeserializer.class;
+ }
+ if (cls == ByteBuffer.class) {
+ return ByteBufferDeserializer.class;
+ }
+ if (cls == byte[].class || cls == Byte[].class || cls == Object.class) {
+ return ByteArrayDeserializer.class;
+ }
+ // fall back to doing nothing (byte[])
+ return ByteArrayDeserializer.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
deleted file mode 100644
index 15e5f70..0000000
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaConsumerFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Properties;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteBufferDeserializer;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.DoubleDeserializer;
-import org.apache.kafka.common.serialization.FloatDeserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.ShortDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.Bytes;
-
-/**
- * Default {@link ConsumerFactory} implementation.
- */
-public class KafkaConsumerFactory implements ConsumerFactory {
-
- private final Properties baseProperties;
-
- public KafkaConsumerFactory(String bootstrapServers) {
- this.baseProperties = new Properties();
- this.baseProperties.setProperty("bootstrap.servers", bootstrapServers);
- }
-
- public KafkaConsumerFactory(Properties baseProperties) {
- this.baseProperties = baseProperties;
- }
-
- @Override
- public <K, V> Consumer<K, V> createConsumer(String topic, Class<K> keyClass, Class<V> valueClass) {
- final String groupId = "apache_metamodel_" + topic + "_" + System.currentTimeMillis();
-
- final Properties properties = new Properties();
- baseProperties.stringPropertyNames().forEach(k -> {
- properties.setProperty(k, baseProperties.getProperty(k));
- });
-
- properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass).getName());
- properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializerForClass(keyClass)
- .getName());
- return new KafkaConsumer<>(properties);
- }
-
- private static Class<? extends Deserializer<?>> deserializerForClass(Class<?> cls) {
- if (cls == String.class || cls == CharSequence.class) {
- return StringDeserializer.class;
- }
- if (cls == Double.class) {
- return DoubleDeserializer.class;
- }
- if (cls == Integer.class) {
- return IntegerDeserializer.class;
- }
- if (cls == Float.class) {
- return FloatDeserializer.class;
- }
- if (cls == Long.class) {
- return LongDeserializer.class;
- }
- if (cls == Short.class) {
- return ShortDeserializer.class;
- }
- if (cls == Bytes.class) {
- return BytesDeserializer.class;
- }
- if (cls == ByteBuffer.class) {
- return ByteBufferDeserializer.class;
- }
- if (cls == byte[].class || cls == Byte[].class || cls == Object.class) {
- return ByteArrayDeserializer.class;
- }
- // fall back to doing nothing
- return ByteArrayDeserializer.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
index 2fa8c06..8f4d7a7 100644
--- a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaDataContext.java
@@ -28,10 +28,14 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateSummary;
+import org.apache.metamodel.UpdateableDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.FirstRowDataSet;
import org.apache.metamodel.data.MaxRowsDataSet;
@@ -47,7 +51,7 @@ import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
-public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
+public class KafkaDataContext<K, V> extends QueryPostprocessDataContext implements UpdateableDataContext {
public static final String SYSTEM_PROPERTY_CONSUMER_POLL_TIMEOUT = "metamodel.kafka.consumer.poll.timeout";
@@ -64,19 +68,19 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
private final Class<K> keyClass;
private final Class<V> valueClass;
- private final ConsumerFactory consumerFactory;
+ private final ConsumerAndProducerFactory consumerAndProducerFactory;
private final Supplier<Collection<String>> topicSupplier;
public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, String bootstrapServers,
Collection<String> topics) {
- this(keyClass, valueClass, new KafkaConsumerFactory(bootstrapServers), () -> topics);
+ this(keyClass, valueClass, new KafkaConsumerAndProducerFactory(bootstrapServers), () -> topics);
}
- public KafkaDataContext(Class<K> keyClass, Class<V> valueClass, ConsumerFactory consumerFactory,
- Supplier<Collection<String>> topicSupplier) {
+ public KafkaDataContext(Class<K> keyClass, Class<V> valueClass,
+ ConsumerAndProducerFactory consumerAndProducerFactory, Supplier<Collection<String>> topicSupplier) {
this.keyClass = keyClass;
this.valueClass = valueClass;
- this.consumerFactory = consumerFactory;
+ this.consumerAndProducerFactory = consumerAndProducerFactory;
this.topicSupplier = topicSupplier;
}
@@ -106,7 +110,7 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
final String topic = table.getName();
- final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
+ final Consumer<K, V> consumer = consumerAndProducerFactory.createConsumer(topic, keyClass, valueClass);
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
final List<TopicPartition> partitions = partitionInfos.stream().map(partitionInfo -> {
return new TopicPartition(topic, partitionInfo.partition());
@@ -171,7 +175,7 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
}
final String topic = table.getName();
- final Consumer<K, V> consumer = consumerFactory.createConsumer(topic, keyClass, valueClass);
+ final Consumer<K, V> consumer = consumerAndProducerFactory.createConsumer(topic, keyClass, valueClass);
// handle partition filtering
final List<TopicPartition> assignedPartitions;
@@ -258,4 +262,18 @@ public class KafkaDataContext<K, V> extends QueryPostprocessDataContext {
return false;
}
}
+
+ @Override
+ public UpdateSummary executeUpdate(UpdateScript update) {
+ final Producer<K, V> producer = consumerAndProducerFactory.createProducer(keyClass, valueClass);
+ final KafkaUpdateCallback<K, V> callback = new KafkaUpdateCallback<>(this, producer);
+ try {
+ update.run(callback);
+ } finally {
+ callback.flush();
+ }
+ final UpdateSummary updateSummary = callback.getUpdateSummary();
+ callback.close();
+ return updateSummary;
+ }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java
new file mode 100644
index 0000000..4253bce
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaInsertBuilder.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+
+final class KafkaInsertBuilder<K, V> extends AbstractRowInsertionBuilder<KafkaUpdateCallback<K, V>> {
+
+ public KafkaInsertBuilder(KafkaUpdateCallback<K, V> updateCallback, Table table) {
+ super(updateCallback, table);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void execute() throws MetaModelException {
+ final Column[] columns = getColumns();
+ final Object[] values = getValues();
+
+ K key = null;
+ V value = null;
+
+ for (int i = 0; i < columns.length; i++) {
+ if (columns[i].getName() == KafkaDataContext.COLUMN_KEY) {
+ key = (K) values[i];
+ }
+ if (columns[i].getName() == KafkaDataContext.COLUMN_VALUE) {
+ value = (V) values[i];
+ }
+ }
+
+ getUpdateCallback().getProducer().send(new ProducerRecord<K, V>(getTable().getName(), key, value));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java
new file mode 100644
index 0000000..a7b6e96
--- /dev/null
+++ b/kafka/src/main/java/org/apache/metamodel/kafka/KafkaUpdateCallback.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.kafka;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+
+final class KafkaUpdateCallback<K, V> extends AbstractUpdateCallback {
+
+ private final Producer<K, V> producer;
+
+ public KafkaUpdateCallback(KafkaDataContext<K, V> kafkaDataContext, Producer<K, V> producer) {
+ super(kafkaDataContext);
+ this.producer = producer;
+ }
+
+ @Override
+ public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
+ IllegalStateException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDropTableSupported() {
+ return false;
+ }
+
+ @Override
+ public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new KafkaInsertBuilder<>(this, table);
+ }
+
+ @Override
+ public boolean isDeleteSupported() {
+ return false;
+ }
+
+ @Override
+ public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
+
+ public Producer<K, V> getProducer() {
+ return producer;
+ }
+
+ public void flush() {
+ producer.flush();
+ }
+
+ public void close() {
+ producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/0e7545e7/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
index 4b19940..491c96d 100644
--- a/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
+++ b/kafka/src/test/java/org/apache/metamodel/kafka/KafkaDataContextTest.java
@@ -25,7 +25,7 @@ public class KafkaDataContextTest extends EasyMockSupport {
@Test
public void testGetSchemaInfo() {
- final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class);
+ final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class);
replayAll();
@@ -39,7 +39,7 @@ public class KafkaDataContextTest extends EasyMockSupport {
@Test
public void testQueryWithoutOptimization() {
- final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class);
+ final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class);
@SuppressWarnings("unchecked")
final Consumer<String, String> consumer = createMock(Consumer.class);
@@ -94,7 +94,7 @@ public class KafkaDataContextTest extends EasyMockSupport {
@Test
public void testQueryOptimizationByPartition() {
- final ConsumerFactory consumerFactory = createMock(ConsumerFactory.class);
+ final ConsumerAndProducerFactory consumerFactory = createMock(ConsumerAndProducerFactory.class);
@SuppressWarnings("unchecked")
final Consumer<String, String> consumer = createMock(Consumer.class);