You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 21:04:51 UTC

[flink] branch master updated (79aa7d1 -> 812d3d1)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 79aa7d1  [FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"
     new cce715b  [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSource connection
     new 6fa85fe  [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection
     new 1f1d756  [hotfix][docs] Fix typos in config option descriptions
     new 080248d  [hotfix][runtime] Remove useless local variable in CompletedCheckpointStoreTest3testAddCheckpointMoreThanMaxRetained
     new b0f985e  [hotfix][docs] Fix some ScalaDocs in ExecutionEnvironment.scala
     new 812d3d1  [FLINK-17582][quickstarts] Update quickstarts to use universal Kafka connector

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/optimizer_configuration.html         |  2 +-
 .../rocks_db_configurable_configuration.html       |  2 +-
 .../streaming/connectors/rabbitmq/RMQSink.java     | 19 ++++++++++++++--
 .../streaming/connectors/rabbitmq/RMQSource.java   | 13 ++++++++---
 .../streaming/connectors/rabbitmq/RMQSinkTest.java | 19 ++++++++++++++++
 .../connectors/rabbitmq/RMQSourceTest.java         | 26 +++++++++++++++++++++-
 .../flink/configuration/OptimizerOptions.java      |  2 +-
 .../src/main/resources/archetype-resources/pom.xml |  2 +-
 .../src/main/resources/archetype-resources/pom.xml |  2 +-
 .../checkpoint/CompletedCheckpointStoreTest.java   |  2 --
 .../flink/api/scala/ExecutionEnvironment.scala     |  2 +-
 .../state/RocksDBConfigurableOptions.java          |  2 +-
 12 files changed, 78 insertions(+), 15 deletions(-)


[flink] 01/06: [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSource connection

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cce715b78ca85b8eee258f32b1e6fb366ca56998
Author: austin ce <au...@gmail.com>
AuthorDate: Mon May 11 19:40:51 2020 -0400

    [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSource connection
---
 .../streaming/connectors/rabbitmq/RMQSource.java   | 13 ++++++++---
 .../connectors/rabbitmq/RMQSourceTest.java         | 26 +++++++++++++++++++++-
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index ab7bdc4..e32ce2d 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -127,13 +127,21 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 
 	/**
 	 * Initializes the connection to RMQ with a default connection factory. The user may override
-	 * this method to setup and configure their own ConnectionFactory.
+	 * this method to setup and configure their own {@link ConnectionFactory}.
 	 */
 	protected ConnectionFactory setupConnectionFactory() throws Exception {
 		return rmqConnectionConfig.getConnectionFactory();
 	}
 
 	/**
+	 * Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
+	 * The user may override this method to setup and configure their own {@link Connection}.
+	 */
+	protected Connection setupConnection() throws Exception {
+		return setupConnectionFactory().newConnection();
+	}
+
+	/**
 	 * Sets up the queue. The default implementation just declares the queue. The user may override
 	 * this method to have a custom setup for the queue (i.e. binding the queue to an exchange or
 	 * defining custom queue parameters)
@@ -145,9 +153,8 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
 	@Override
 	public void open(Configuration config) throws Exception {
 		super.open(config);
-		ConnectionFactory factory = setupConnectionFactory();
 		try {
-			connection = factory.newConnection();
+			connection = setupConnection();
 			channel = connection.createChannel();
 			if (channel == null) {
 				throw new RuntimeException("None of RabbitMQ channels are available");
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index 925457f..b53723c 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -329,6 +329,26 @@ public class RMQSourceTest {
 		assertThat("Open method was not called", deserializationSchema.isOpenCalled(), is(true));
 	}
 
+	@Test
+	public void testOverrideConnection() throws Exception {
+		final Connection mockConnection = Mockito.mock(Connection.class);
+		Channel channel = Mockito.mock(Channel.class);
+		Mockito.when(mockConnection.createChannel()).thenReturn(channel);
+
+		RMQMockedRuntimeTestSource source = new RMQMockedRuntimeTestSource() {
+			@Override
+			protected Connection setupConnection() throws Exception {
+				return mockConnection;
+			}
+		};
+
+		FunctionInitializationContext mockContext = getMockContext();
+		source.initializeState(mockContext);
+		source.open(new Configuration());
+
+		Mockito.verify(mockConnection, Mockito.times(1)).createChannel();
+	}
+
 	private static class ConstructorTestClass extends RMQSource<String> {
 
 		private ConnectionFactory factory;
@@ -411,6 +431,10 @@ public class RMQSourceTest {
 			this(connectionConfig, new StringDeserializationScheme());
 		}
 
+		public RMQMockedRuntimeTestSource() {
+			this(new StringDeserializationScheme());
+		}
+
 		@Override
 		public RuntimeContext getRuntimeContext() {
 			return runtimeContext;
@@ -421,7 +445,7 @@ public class RMQSourceTest {
 		private ArrayDeque<Tuple2<Long, Set<String>>> restoredState;
 
 		public RMQTestSource() {
-			this(new StringDeserializationScheme());
+			super();
 		}
 
 		public RMQTestSource(DeserializationSchema<String> deserializationSchema) {


[flink] 05/06: [hotfix][docs] Fix some ScalaDocs in ExecutionEnvironment.scala

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0f985e7497099861be48261be3fd25735d7e89c
Author: zhenxianyimeng <19...@qq.com>
AuthorDate: Mon May 11 11:05:09 2020 +0800

    [hotfix][docs] Fix some ScalaDocs in ExecutionEnvironment.scala
    
    This closes #12064
---
 .../main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 4d14d9b..309794b 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -214,7 +214,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    *
    * @param filePath The path of the file, as a URI (e.g., "file:///some/local/file" or
    *                 "hdfs://host:port/file/path").
-   * @param charsetName The name of the character set used to read the file. Default is UTF-0
+   * @param charsetName The name of the character set used to read the file. Default is UTF-8
    */
   def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String] = {
     require(filePath != null, "The file path may not be null.")


[flink] 04/06: [hotfix][runtime] Remove useless local variable in CompletedCheckpointStoreTest3testAddCheckpointMoreThanMaxRetained

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 080248d7426fd3d9339c8bf7f998428a5adf0458
Author: felixzheng <fe...@gmail.com>
AuthorDate: Mon May 11 12:44:19 2020 +0800

    [hotfix][runtime] Remove useless local variable in CompletedCheckpointStoreTest3testAddCheckpointMoreThanMaxRetained
    
    This closes #12066
---
 .../apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java   | 2 --
 1 file changed, 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 17c470e..bdde348 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -105,8 +105,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
 
 		for (int i = 1; i < expected.length; i++) {
-			Collection<OperatorState> taskStates = expected[i - 1].getOperatorStates().values();
-
 			checkpoints.addCheckpoint(expected[i]);
 
 			// The ZooKeeper implementation discards asynchronously


[flink] 06/06: [FLINK-17582][quickstarts] Update quickstarts to use universal Kafka connector

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 812d3d13d8c7966d62f40b990470ec468a16c80e
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Fri May 8 14:55:04 2020 -0500

    [FLINK-17582][quickstarts] Update quickstarts to use universal Kafka connector
    
    This closes #12044
---
 .../src/main/resources/archetype-resources/pom.xml                      | 2 +-
 .../src/main/resources/archetype-resources/pom.xml                      | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index c051ced..4100bab 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -79,7 +79,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 		-->
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index f17f663..b40a779 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -86,7 +86,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 			<version>${flink.version}</version>
 		</dependency>
 		-->


[flink] 03/06: [hotfix][docs] Fix typos in config option descriptions

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f1d75672729fc7ea237dccd153efce035f4f9f3
Author: wangxianghu <wx...@126.com>
AuthorDate: Sat May 16 22:32:41 2020 +0800

    [hotfix][docs] Fix typos in config option descriptions
    
    This closes #12191
---
 docs/_includes/generated/optimizer_configuration.html                   | 2 +-
 docs/_includes/generated/rocks_db_configurable_configuration.html       | 2 +-
 .../src/main/java/org/apache/flink/configuration/OptimizerOptions.java  | 2 +-
 .../flink/contrib/streaming/state/RocksDBConfigurableOptions.java       | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/docs/_includes/generated/optimizer_configuration.html b/docs/_includes/generated/optimizer_configuration.html
index 3b49f01..7095a36 100644
--- a/docs/_includes/generated/optimizer_configuration.html
+++ b/docs/_includes/generated/optimizer_configuration.html
@@ -12,7 +12,7 @@
             <td><h5>compiler.delimited-informat.max-line-samples</h5></td>
             <td style="word-wrap: break-word;">10</td>
             <td>Integer</td>
-            <td>he maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters.</td>
+            <td>The maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters.</td>
         </tr>
         <tr>
             <td><h5>compiler.delimited-informat.max-sample-len</h5></td>
diff --git a/docs/_includes/generated/rocks_db_configurable_configuration.html b/docs/_includes/generated/rocks_db_configurable_configuration.html
index 518ce35..a22f5d2 100644
--- a/docs/_includes/generated/rocks_db_configurable_configuration.html
+++ b/docs/_includes/generated/rocks_db_configurable_configuration.html
@@ -66,7 +66,7 @@
             <td><h5>state.backend.rocksdb.writebuffer.count</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
-            <td>Tne maximum number of write buffers that are built up in memory. RocksDB has default configuration as '2'.</td>
+            <td>The maximum number of write buffers that are built up in memory. RocksDB has default configuration as '2'.</td>
         </tr>
         <tr>
             <td><h5>state.backend.rocksdb.writebuffer.number-to-merge</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/OptimizerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/OptimizerOptions.java
index 9e7ff68..cac4072 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/OptimizerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/OptimizerOptions.java
@@ -35,7 +35,7 @@ public class OptimizerOptions {
 	public static final ConfigOption<Integer> DELIMITED_FORMAT_MAX_LINE_SAMPLES =
 		key("compiler.delimited-informat.max-line-samples")
 			.defaultValue(10)
-			.withDescription("he maximum number of line samples taken by the compiler for delimited inputs. The samples" +
+			.withDescription("The maximum number of line samples taken by the compiler for delimited inputs. The samples" +
 				" are used to estimate the number of records. This value can be overridden for a specific input with the" +
 				" input format’s parameters.");
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index e725bbd..22b7ef2 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -110,7 +110,7 @@ public class RocksDBConfigurableOptions implements Serializable {
 		key("state.backend.rocksdb.writebuffer.count")
 			.intType()
 			.noDefaultValue()
-			.withDescription("Tne maximum number of write buffers that are built up in memory. " +
+			.withDescription("The maximum number of write buffers that are built up in memory. " +
 				"RocksDB has default configuration as '2'.");
 
 	public static final ConfigOption<Integer> MIN_WRITE_BUFFER_NUMBER_TO_MERGE =


[flink] 02/06: [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fa85fea3c3dc1414c1aa4147744c82b3f4fede0
Author: austin ce <au...@gmail.com>
AuthorDate: Mon May 11 20:00:36 2020 -0400

    [FLINK-7267][connectors/rabbitmq] Allow overriding RMQSink connection
    
    This closes #12185
---
 .../flink/streaming/connectors/rabbitmq/RMQSink.java  | 19 +++++++++++++++++--
 .../streaming/connectors/rabbitmq/RMQSinkTest.java    | 19 +++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 5966713..0218df5 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -137,13 +137,28 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		this.logFailuresOnly = logFailuresOnly;
 	}
 
+	/**
+	 * Initializes the connection to RMQ with a default connection factory. The user may override
+	 * this method to setup and configure their own {@link ConnectionFactory}.
+	 */
+	protected ConnectionFactory setupConnectionFactory() throws Exception {
+		return rmqConnectionConfig.getConnectionFactory();
+	}
+
+	/**
+	 * Initializes the connection to RMQ using the default connection factory from {@link #setupConnectionFactory()}.
+	 * The user may override this method to setup and configure their own {@link Connection}.
+	 */
+	protected Connection setupConnection() throws Exception {
+		return setupConnectionFactory().newConnection();
+	}
+
 	@Override
 	public void open(Configuration config) throws Exception {
-		ConnectionFactory factory = rmqConnectionConfig.getConnectionFactory();
 		schema.open(() -> getRuntimeContext().getMetricGroup().addGroup("user"));
 
 		try {
-			connection = factory.newConnection();
+			connection = setupConnection();
 			channel = connection.createChannel();
 			if (channel == null) {
 				throw new RuntimeException("None of RabbitMQ channels are available");
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
index ea126d0..72fe1af 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -98,6 +99,24 @@ public class RMQSinkTest {
 	}
 
 	@Test
+	public void testOverrideConnection() throws Exception {
+		final Connection mockConnection = mock(Connection.class);
+		Channel channel = mock(Channel.class);
+		when(mockConnection.createChannel()).thenReturn(channel);
+
+		RMQSink<String> rmqSink = new RMQSink<String>(rmqConnectionConfig, QUEUE_NAME, serializationSchema) {
+			@Override
+			protected Connection setupConnection() throws Exception {
+				return mockConnection;
+			}
+		};
+
+		rmqSink.open(new Configuration());
+
+		verify(mockConnection, times(1)).createChannel();
+	}
+
+	@Test
 	public void throwExceptionIfChannelIsNull() throws Exception {
 		when(connection.createChannel()).thenReturn(null);
 		try {