You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/06/17 21:28:22 UTC

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2128

    [FLINK-4053] Return value from Connection should be checked against null

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink check-null

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2128.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2128
    
----
commit 06ecc0e817743e2b2b6bf6e582a600b0dd070fde
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-17T20:45:25Z

    [FLINK-4053] Add tests for SMQSink

commit 35a82d8780183146719cc7dac485e7878d016ad4
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-17T21:22:31Z

    [FLINK-4053] Check return value of RabbitMQ against null

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2128#discussion_r68654363
  
    --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---
    @@ -76,6 +76,9 @@ public void open(Configuration config) throws Exception {
     		try {
     			connection = factory.newConnection();
     			channel = connection.createChannel();
    +			if (channel == null) {
    --- End diff --
    
    As far as I understand in RabbitMQ every real TCP connection has a number of virtual connections implemented on top of it. If we request a channel and it's not available "createChannel" returns null.
    I've updated the exception message to reflect that.
    
    I don't think this generally will be an issue, since we only request one channel for a connection, but it would be beneficial to throw an exception with an appropriate message instead of a NullPointerException in case if anything goes wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    Thank you @zentol !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    @zentol I see your point. Do you suggest to remove these tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2128#discussion_r68406562
  
    --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/Utils.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.rabbitmq.common;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +
    +import java.io.IOException;
    +
    +/**
    + * Common utils for RabbitMQ streaming connector.
    + */
    +public class Utils {
    +	private Utils() {}
    +
    +	/**
    +	 *	Create RabbitMQ and check for null reference.
    +	 *
    +	 * @param connection RabbitMQ connection
    +	 * @return RabbitMQ channel
    +	 * @throws IOException if failed to open RabbitMQ channel
    +	 * @throws RuntimeException if connection object returned null channel
    +	 */
    +	public static Channel createChannel(Connection connection) throws IOException {
    --- End diff --
    
    a separate class and static method is a bit overkill for a null check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    @zentol Should I update this PR somehow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2128#discussion_r68408173
  
    --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java ---
    @@ -0,0 +1,125 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.rabbitmq.common;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ConnectionFactory;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Mockito.*;
    +
    +public class RMQSinkTest {
    +
    +	private static final String QUEUE_NAME = "queue";
    +	private static final String MESSAGE_STR = "msg";
    +	private static final byte[] MESSAGE = new byte[1];
    +
    +	private RMQConnectionConfig rmqConnectionConfig;
    +	private ConnectionFactory connectionFactory;
    +	private Connection connection;
    +	private Channel channel;
    +	private SerializationSchema<String> serializationSchema;
    +
    +
    +	@Before
    +	public void before() throws Exception {
    +		serializationSchema = spy(new DummySerializationSchema());
    +		rmqConnectionConfig = mock(RMQConnectionConfig.class);
    +		connectionFactory = mock(ConnectionFactory.class);
    +		connection = mock(Connection.class);
    +		channel = mock(Channel.class);
    +
    +		when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
    +		when(connectionFactory.newConnection()).thenReturn(connection);
    +		when(connection.createChannel()).thenReturn(channel);
    +	}
    +
    +	@Test
    +	public void openCallDeclaresQueue() throws Exception {
    +		createRMQSink();
    +
    +		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
    +	}
    +
    +	@Test
    +	public void throwExceptionIfChannenIsNull() throws Exception {
    +		when(connection.createChannel()).thenReturn(null);
    +		try {
    +			createRMQSink();
    +		} catch (RuntimeException ex) {
    +			assertEquals("RabbitMQ connection returned null channel", ex.getMessage());
    +		}
    +	}
    +
    +	private RMQSink<String> createRMQSink() throws Exception {
    +		RMQSink rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
    +		rmqSink.open(new Configuration());
    +		return rmqSink;
    +	}
    +
    +	@Test
    +	public void invokePublishBytesToQueue() throws Exception {
    +		RMQSink<String> rmqSink = createRMQSink();
    +
    +		rmqSink.invoke(MESSAGE_STR);
    +		verify(serializationSchema).serialize(MESSAGE_STR);
    +		verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
    +	}
    +
    +	@Test(expected = RuntimeException.class)
    +	public void exceptionDuringPublishingIfNotIgnoredError() throws Exception {
    --- End diff --
    
    should this be called "exceptionDuringPublishingIsNotIgnored"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    I wanted to avoid this bit of duplication, but I have no strong preferences for that. Will remove this static method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    Thank your for reminding me, i forgot about this PR a bit :(
    
    The PR is fine imo, +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    Hi all. If this is fine and useful, could someone please merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2128#discussion_r68580911
  
    --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java ---
    @@ -76,6 +76,9 @@ public void open(Configuration config) throws Exception {
     		try {
     			connection = factory.newConnection();
     			channel = connection.createChannel();
    +			if (channel == null) {
    --- End diff --
    
    Do you know under which circumstances this can occur and/or what a user can do to resolve it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    I tihnk this looks good.
    
    One remark for future contributions though: I think some of your tests are too specific. 
    These include `openCallDeclaresQueue`,  `throwExceptionIfChannelIsNull` and `invokePublishBytesToQueue`.
    
    Effectively you are not testing functionality but putting a lock on every single line of the implementation. 
    
    We will now get a test failure (which _should_ mean that the functionality is broken) if we change the exception message (or even handle it completely), use a different `basicPublish` method (or pass different arguments) or pass different `queueDeclare` arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2128#discussion_r68406896
  
    --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java ---
    @@ -0,0 +1,125 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.connectors.rabbitmq.common;
    +
    +import com.rabbitmq.client.Channel;
    +import com.rabbitmq.client.Connection;
    +import com.rabbitmq.client.ConnectionFactory;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Mockito.*;
    +
    +public class RMQSinkTest {
    +
    +	private static final String QUEUE_NAME = "queue";
    +	private static final String MESSAGE_STR = "msg";
    +	private static final byte[] MESSAGE = new byte[1];
    +
    +	private RMQConnectionConfig rmqConnectionConfig;
    +	private ConnectionFactory connectionFactory;
    +	private Connection connection;
    +	private Channel channel;
    +	private SerializationSchema<String> serializationSchema;
    +
    +
    +	@Before
    +	public void before() throws Exception {
    +		serializationSchema = spy(new DummySerializationSchema());
    +		rmqConnectionConfig = mock(RMQConnectionConfig.class);
    +		connectionFactory = mock(ConnectionFactory.class);
    +		connection = mock(Connection.class);
    +		channel = mock(Channel.class);
    +
    +		when(rmqConnectionConfig.getConnectionFactory()).thenReturn(connectionFactory);
    +		when(connectionFactory.newConnection()).thenReturn(connection);
    +		when(connection.createChannel()).thenReturn(channel);
    +	}
    +
    +	@Test
    +	public void openCallDeclaresQueue() throws Exception {
    +		createRMQSink();
    +
    +		verify(channel).queueDeclare(QUEUE_NAME, false, false, false, null);
    +	}
    +
    +	@Test
    +	public void throwExceptionIfChannenIsNull() throws Exception {
    --- End diff --
    
    type: ChannelIsNull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2128: [FLINK-4053] Return value from Connection should b...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2128


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2128: [FLINK-4053] Return value from Connection should be check...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2128
  
    @zentol I've fixed my PR according to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---