You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/08/27 22:30:33 UTC

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2430

    [FLINK-3874] Rewrite Kafka JSON Table sink tests

    Turned Kafka JSON Table sink tests into unit tests as discussed here: https://issues.apache.org/jira/browse/FLINK-3874
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink fix-kafka-tests

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2430.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2430
    
----
commit bdb52d2164d3cea83a9cc9121dc3663e50e50aa9
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-08-27T22:24:21Z

    [FLINK-3874] Rewrite Kafka JSON Table sink tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2430
  
    @mushketyk thanks. I will merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r80783750
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -17,123 +17,79 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
    +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -	protected final static String TOPIC = "customPartitioningTestTopic";
    -	protected final static int PARALLELISM = 1;
    +	protected final static String TOPIC = "testTopic";
     	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
     	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +	protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
    +
     	@Test
     	public void testKafkaTableSink() throws Exception {
    -		LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -		createTestTopic(TOPIC, PARALLELISM, 1);
    -		StreamExecutionEnvironment env = createEnvironment();
    -
    -		createProducingTopology(env);
    -		createConsumingTopology(env);
    -
    -		tryExecute(env, "custom partitioning test");
    -		deleteTestTopic(TOPIC);
    -		LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
    -	}
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		kafkaTableSink.emitDataStream(dataStream);
     
    -	private StreamExecutionEnvironment createEnvironment() {
    -		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -		env.setRestartStrategy(RestartStrategies.noRestart());
    -		env.getConfig().disableSysoutLogging();
    -		return env;
    +		verify(dataStream).addSink(kafkaProducer);
     	}
     
    -	private void createProducingTopology(StreamExecutionEnvironment env) {
    -		DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
    -			private boolean running = true;
    -
    -			@Override
    -			public void run(SourceContext<Row> ctx) throws Exception {
    -				long cnt = 0;
    -				while (running) {
    -					Row row = new Row(2);
    -					row.setField(0, cnt);
    -					row.setField(1, "kafka-" + cnt);
    -					ctx.collect(row);
    -					cnt++;
    -				}
    -			}
    -
    -			@Override
    -			public void cancel() {
    -				running = false;
    -			}
    -		})
    -		.setParallelism(1);
    -
    -		KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -		kafkaTableSinkBase.emitDataStream(stream);
    +	@Test
    +	public void testCorrectProducerIsCreated() throws Exception {
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = spy(createTableSink());
    +		kafkaTableSink.emitDataStream(dataStream);
    +
    +		verify(kafkaTableSink).createKafkaProducer(
    +			eq(TOPIC),
    +			eq(createSinkProperties()),
    +			any(JsonRowSerializationSchema.class),
    +			any(CustomPartitioner.class));
     	}
     
    -	private void createConsumingTopology(StreamExecutionEnvironment env) {
    -		DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema();
    -
    -		FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
    -
    -		env.addSource(source).setParallelism(PARALLELISM)
    -			.map(new RichMapFunction<Row, Integer>() {
    -				@Override
    -				public Integer map(Row value) {
    -					return (Integer) value.productElement(0);
    -				}
    -			}).setParallelism(PARALLELISM)
    -
    -			.addSink(new SinkFunction<Integer>() {
    -				HashSet<Integer> ids = new HashSet<>();
    -				@Override
    -				public void invoke(Integer value) throws Exception {
    -					ids.add(value);
    -
    -					if (ids.size() == 100) {
    -						throw new SuccessException();
    -					}
    -				}
    -			}).setParallelism(1);
    +	@Test
    +	public void testConfigure() {
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
    +		assertNotSame(kafkaTableSink, newKafkaTableSink);
    +
    +		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
    +		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
    +		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
     	}
     
     	protected KafkaPartitioner<Row> createPartitioner() {
    --- End diff --
    
    Please add a partitioner as final member variable to the test which is used. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2430
  
    Hey @fhueske 
    
    I've updated the PR and implemented most of your suggestion. I am not sure how to check if the right producer version is used thought.
    The problem is that to check the producer type we need to created its instance, and when an instance is created it immediately tries to connect to Kafka. I can start test Kafka to overcome this, but it seems like an overkill and looks like a step in the opposite direction. 
    
    What do you think about this?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r80784331
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -17,123 +17,79 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
    +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -	protected final static String TOPIC = "customPartitioningTestTopic";
    -	protected final static int PARALLELISM = 1;
    +	protected final static String TOPIC = "testTopic";
     	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
     	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +	protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
    +
     	@Test
     	public void testKafkaTableSink() throws Exception {
    -		LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -		createTestTopic(TOPIC, PARALLELISM, 1);
    -		StreamExecutionEnvironment env = createEnvironment();
    -
    -		createProducingTopology(env);
    -		createConsumingTopology(env);
    -
    -		tryExecute(env, "custom partitioning test");
    -		deleteTestTopic(TOPIC);
    -		LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
    -	}
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		kafkaTableSink.emitDataStream(dataStream);
     
    -	private StreamExecutionEnvironment createEnvironment() {
    -		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -		env.setRestartStrategy(RestartStrategies.noRestart());
    -		env.getConfig().disableSysoutLogging();
    -		return env;
    +		verify(dataStream).addSink(kafkaProducer);
     	}
     
    -	private void createProducingTopology(StreamExecutionEnvironment env) {
    -		DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
    -			private boolean running = true;
    -
    -			@Override
    -			public void run(SourceContext<Row> ctx) throws Exception {
    -				long cnt = 0;
    -				while (running) {
    -					Row row = new Row(2);
    -					row.setField(0, cnt);
    -					row.setField(1, "kafka-" + cnt);
    -					ctx.collect(row);
    -					cnt++;
    -				}
    -			}
    -
    -			@Override
    -			public void cancel() {
    -				running = false;
    -			}
    -		})
    -		.setParallelism(1);
    -
    -		KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -		kafkaTableSinkBase.emitDataStream(stream);
    +	@Test
    +	public void testCorrectProducerIsCreated() throws Exception {
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = spy(createTableSink());
    +		kafkaTableSink.emitDataStream(dataStream);
    +
    +		verify(kafkaTableSink).createKafkaProducer(
    +			eq(TOPIC),
    +			eq(createSinkProperties()),
    +			any(JsonRowSerializationSchema.class),
    +			any(CustomPartitioner.class));
     	}
     
    -	private void createConsumingTopology(StreamExecutionEnvironment env) {
    -		DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema();
    -
    -		FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
    -
    -		env.addSource(source).setParallelism(PARALLELISM)
    -			.map(new RichMapFunction<Row, Integer>() {
    -				@Override
    -				public Integer map(Row value) {
    -					return (Integer) value.productElement(0);
    -				}
    -			}).setParallelism(PARALLELISM)
    -
    -			.addSink(new SinkFunction<Integer>() {
    -				HashSet<Integer> ids = new HashSet<>();
    -				@Override
    -				public void invoke(Integer value) throws Exception {
    -					ids.add(value);
    -
    -					if (ids.size() == 100) {
    -						throw new SuccessException();
    -					}
    -				}
    -			}).setParallelism(1);
    +	@Test
    +	public void testConfigure() {
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
    +		assertNotSame(kafkaTableSink, newKafkaTableSink);
    +
    +		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
    +		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
    +		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
     	}
     
     	protected KafkaPartitioner<Row> createPartitioner() {
     		return new CustomPartitioner();
     	}
     
     	protected Properties createSinkProperties() {
    --- End diff --
    
    Please add a Properties object (non-empty) as a final member variable to the test base which is used. This will make the equals check more meaningful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r80783113
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -17,123 +17,79 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
    +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotSame;
    +import static org.mockito.Matchers.eq;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.any;
    +import static org.mockito.Mockito.spy;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -	protected final static String TOPIC = "customPartitioningTestTopic";
    -	protected final static int PARALLELISM = 1;
    +	protected final static String TOPIC = "testTopic";
     	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
     	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +	protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
    +
     	@Test
     	public void testKafkaTableSink() throws Exception {
    -		LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -		createTestTopic(TOPIC, PARALLELISM, 1);
    -		StreamExecutionEnvironment env = createEnvironment();
    -
    -		createProducingTopology(env);
    -		createConsumingTopology(env);
    -
    -		tryExecute(env, "custom partitioning test");
    -		deleteTestTopic(TOPIC);
    -		LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
    -	}
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		kafkaTableSink.emitDataStream(dataStream);
     
    -	private StreamExecutionEnvironment createEnvironment() {
    -		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -		env.setRestartStrategy(RestartStrategies.noRestart());
    -		env.getConfig().disableSysoutLogging();
    -		return env;
    +		verify(dataStream).addSink(kafkaProducer);
     	}
     
    -	private void createProducingTopology(StreamExecutionEnvironment env) {
    -		DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
    -			private boolean running = true;
    -
    -			@Override
    -			public void run(SourceContext<Row> ctx) throws Exception {
    -				long cnt = 0;
    -				while (running) {
    -					Row row = new Row(2);
    -					row.setField(0, cnt);
    -					row.setField(1, "kafka-" + cnt);
    -					ctx.collect(row);
    -					cnt++;
    -				}
    -			}
    -
    -			@Override
    -			public void cancel() {
    -				running = false;
    -			}
    -		})
    -		.setParallelism(1);
    -
    -		KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -		kafkaTableSinkBase.emitDataStream(stream);
    +	@Test
    +	public void testCorrectProducerIsCreated() throws Exception {
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = spy(createTableSink());
    +		kafkaTableSink.emitDataStream(dataStream);
    +
    +		verify(kafkaTableSink).createKafkaProducer(
    +			eq(TOPIC),
    +			eq(createSinkProperties()),
    +			any(JsonRowSerializationSchema.class),
    +			any(CustomPartitioner.class));
     	}
     
    -	private void createConsumingTopology(StreamExecutionEnvironment env) {
    -		DeserializationSchema<Row> deserializationSchema = createRowDeserializationSchema();
    -
    -		FlinkKafkaConsumerBase<Row> source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps);
    -
    -		env.addSource(source).setParallelism(PARALLELISM)
    -			.map(new RichMapFunction<Row, Integer>() {
    -				@Override
    -				public Integer map(Row value) {
    -					return (Integer) value.productElement(0);
    -				}
    -			}).setParallelism(PARALLELISM)
    -
    -			.addSink(new SinkFunction<Integer>() {
    -				HashSet<Integer> ids = new HashSet<>();
    -				@Override
    -				public void invoke(Integer value) throws Exception {
    -					ids.add(value);
    -
    -					if (ids.size() == 100) {
    -						throw new SuccessException();
    -					}
    -				}
    -			}).setParallelism(1);
    +	@Test
    +	public void testConfigure() {
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
    +		assertNotSame(kafkaTableSink, newKafkaTableSink);
    +
    +		assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
    +		assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
    +		assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
     	}
     
     	protected KafkaPartitioner<Row> createPartitioner() {
     		return new CustomPartitioner();
     	}
     
     	protected Properties createSinkProperties() {
    -		return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings);
    +		return new Properties();
     	}
     
     	protected abstract KafkaTableSink createTableSink();
    --- End diff --
    
    I would add the required parameter for a KafkaJsonTableSink here (topic, props, partitioner). That will make the code a bit easier to understand, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2430
  
    Hi @mushketyk, thanks for reworking the tests and sorry that the review took a while.
    I added a comment. Please let me know if you have questions.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2430#discussion_r79390399
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---
    @@ -17,123 +17,60 @@
      */
     package org.apache.flink.streaming.connectors.kafka;
     
    -import org.apache.flink.api.common.functions.RichMapFunction;
    -import org.apache.flink.api.common.restartstrategy.RestartStrategies;
     import org.apache.flink.api.common.typeinfo.TypeInformation;
     import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.typeutils.RowTypeInfo;
     import org.apache.flink.streaming.api.datastream.DataStream;
    -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    -import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    -import org.apache.flink.streaming.api.functions.source.SourceFunction;
     import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil;
     import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    -import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    -import org.apache.flink.test.util.SuccessException;
     import org.junit.Test;
     
     import java.io.Serializable;
    -import java.util.HashSet;
     import java.util.Properties;
     
    -import static org.apache.flink.test.util.TestUtils.tryExecute;
    +import static org.junit.Assert.*;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.verify;
     
    -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable {
    +public abstract class KafkaTableSinkTestBase implements Serializable {
     
    -	protected final static String TOPIC = "customPartitioningTestTopic";
    -	protected final static int PARALLELISM = 1;
    +	protected final static String TOPIC = "testTopic";
     	protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"};
     	protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class});
     
    +	protected FlinkKafkaProducerBase<Row> kafkaProducer = mock(FlinkKafkaProducerBase.class);
    +
     	@Test
     	public void testKafkaTableSink() throws Exception {
    -		LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()");
    -
    -		createTestTopic(TOPIC, PARALLELISM, 1);
    -		StreamExecutionEnvironment env = createEnvironment();
    -
    -		createProducingTopology(env);
    -		createConsumingTopology(env);
    -
    -		tryExecute(env, "custom partitioning test");
    -		deleteTestTopic(TOPIC);
    -		LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()");
    -	}
    -
    -	private StreamExecutionEnvironment createEnvironment() {
    -		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
    -		env.setRestartStrategy(RestartStrategies.noRestart());
    -		env.getConfig().disableSysoutLogging();
    -		return env;
    -	}
    -
    -	private void createProducingTopology(StreamExecutionEnvironment env) {
    -		DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
    -			private boolean running = true;
    -
    -			@Override
    -			public void run(SourceContext<Row> ctx) throws Exception {
    -				long cnt = 0;
    -				while (running) {
    -					Row row = new Row(2);
    -					row.setField(0, cnt);
    -					row.setField(1, "kafka-" + cnt);
    -					ctx.collect(row);
    -					cnt++;
    -				}
    -			}
    +		DataStream dataStream = mock(DataStream.class);
    +		KafkaTableSink kafkaTableSink = createTableSink();
    +		kafkaTableSink.emitDataStream(dataStream);
     
    -			@Override
    -			public void cancel() {
    -				running = false;
    -			}
    -		})
    -		.setParallelism(1);
    -
    -		KafkaTableSink kafkaTableSinkBase = createTableSink();
    -
    -		kafkaTableSinkBase.emitDataStream(stream);
    +		verify(dataStream).addSink(kafkaProducer);
    --- End diff --
    
    This test checks that `addSink` was called with a `FlinkKafkaProducerBase` object. I think it would be good to extend the test to make sure that
    - the right producer was used (Kafka 0.8, 0.9)
    - the correct topic is fetched
    - the properties are correctly passed on
    - the correct serialization scheme is used
    - the correct partitioner is used
    
    Would that be possible?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2430
  
    Hi @fhueske 
    Thank you for your review. I've updated my PR accordingly.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2430


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2430: [FLINK-3874] Rewrite Kafka JSON Table sink tests

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2430
  
    Hi @fhueske,
    
    Thank you for your review. I'll take a look at the code, check if these checks are possible to implement and come back with a reply.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---