You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/19 13:31:21 UTC
[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON
serialization
[ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503504#comment-15503504 ]
ASF GitHub Bot commented on FLINK-3874:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2430#discussion_r79390399
--- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
@@ -17,123 +17,60 @@
*/
package org.apache.flink.streaming.connectors.kafka;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.test.util.SuccessException;
import org.junit.Test;
import java.io.Serializable;
-import java.util.HashSet;
import java.util.Properties;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
-public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
+public abstract class KafkaTableSinkTestBase implements Serializable {
- protected final static String TOPIC = "customPartitioningTestTopic";
- protected final static int PARALLELISM = 1;
+ protected final static String TOPIC = "testTopic";
protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
+ protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
+
@Test
public void testKafkaTableSink() throws Exception {
- LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
-
- createTestTopic(TOPIC, PARALLELISM, 1);
- StreamExecutionEnvironment env = createEnvironment();
-
- createProducingTopology(env);
- createConsumingTopology(env);
-
- tryExecute(env, "custom partitioning test");
- deleteTestTopic(TOPIC);
- LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
- }
-
- private StreamExecutionEnvironment createEnvironment() {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
- env.setRestartStrategy(RestartStrategies.noRestart());
- env.getConfig().disableSysoutLogging();
- return env;
- }
-
- private void createProducingTopology(StreamExecutionEnvironment env) {
- DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
- private boolean running = true;
-
- @Override
- public void run(SourceContext<Row> ctx) throws Exception {
- long cnt = 0;
- while (running) {
- Row row = new Row(2);
- row.setField(0, cnt);
- row.setField(1, "kafka-" + cnt);
- ctx.collect(row);
- cnt++;
- }
- }
+ DataStream dataStream = mock(DataStream.class);
+ KafkaTableSink kafkaTableSink = createTableSink();
+ kafkaTableSink.emitDataStream(dataStream);
- @Override
- public void cancel() {
- running = false;
- }
- })
- .setParallelism(1);
-
- KafkaTableSink kafkaTableSinkBase = createTableSink();
-
- kafkaTableSinkBase.emitDataStream(stream);
+ verify(dataStream).addSink(kafkaProducer);
--- End diff --
This test checks that `addSink` was called with a `FlinkKafkaProducerBase` object. I think it would be good to extend the test to make sure that
- the right producer was used (Kafka 0.8, 0.9)
- the correct topic is fetched
- the properties are correctly passed on
- the correct serialization scheme is used
- the correct partitioner is used
Would that be possible?
> Add a Kafka TableSink with JSON serialization
> ---------------------------------------------
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Ivan Mushketyk
> Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)