You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/19 13:31:21 UTC

[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

    [ https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15503504#comment-15503504 ] 

ASF GitHub Bot commented on FLINK-3874:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r79390399
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -17,123 +17,60 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -	protected final static String TOPIC = "customPartitioningTestTopic";
    -	protected final static int PARALLELISM = 1;
    +	protected final static String TOPIC = "testTopic";
     	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
     	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +	protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
    +
     	@Test
     	public void testKafkaTableSink() throws Exception {
    -		LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -		createTestTopic(TOPIC, PARALLELISM, 1);
    -		StreamExecutionEnvironment env = createEnvironment();
    -
    -		createProducingTopology(env);
    -		createConsumingTopology(env);
    -
    -		tryExecute(env, "custom partitioning test");
    -		deleteTestTopic(TOPIC);
    -		LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
    -	}
    -
    -	private StreamExecutionEnvironment createEnvironment() {
    -		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -		env.setRestartStrategy(RestartStrategies.noRestart());
    -		env.getConfig().disableSysoutLogging();
    -		return env;
    -	}
    -
    -	private void createProducingTopology(StreamExecutionEnvironment env) {
    -		DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
    -			private boolean running = true;
    -
    -			@Override
    -			public void run(SourceContext<Row> ctx) throws Exception {
    -				long cnt = 0;
    -				while (running) {
    -					Row row = new Row(2);
    -					row.setField(0, cnt);
    -					row.setField(1, "kafka-" + cnt);
    -					ctx.collect(row);
    -					cnt++;
    -				}
    -			}
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		kafkaTableSink.emitDataStream(dataStream);
     
    -			@Override
    -			public void cancel() {
    -				running = false;
    -			}
    -		})
    -		.setParallelism(1);
    -
    -		KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -		kafkaTableSinkBase.emitDataStream(stream);
    +		verify(dataStream).addSink(kafkaProducer);
    --- End diff --
    
    This test checks that `addSink` was called with a `FlinkKafkaProducerBase` object. I think it would be good to extend the test to make sure that
    - the right producer was used (Kafka 0.8, 0.9)
    - the correct topic is fetched
    - the properties are correctly passed on
    - the correct serialization scheme is used
    - the correct partitioner is used
    
    Would that be possible?


> Add a Kafka TableSink with JSON serialization
> ---------------------------------------------
>
>                 Key: FLINK-3874
>                 URL: https://issues.apache.org/jira/browse/FLINK-3874
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Ivan Mushketyk
>            Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)