You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/10/22 14:11:30 UTC

[flink] branch master updated (56126bd -> c31e44e)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 56126bd  [FLINK-14447] Network metrics doc table render confusion
     new ee43395  [hotfix][kafka,test] Drop unused field in FailingIdentityMapper
     new 8a75dec  [hotfix][kafka,test] Remove unused statement from testOneToOneAtLeastOnce
     new 5ecb351  [hotfix][dist] Rename local variables in GlobalConfiguration
     new c31e44e  [FLINK-14235][kafka,tests] Change source in at-least-once test from finite to infinite

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connectors/kafka/KafkaProducerTestBase.java     | 21 +++++++++++++++++++--
 .../kafka/testutils/FailingIdentityMapper.java      |  2 --
 .../flink/configuration/GlobalConfiguration.java    | 14 +++++++-------
 3 files changed, 26 insertions(+), 11 deletions(-)


[flink] 02/04: [hotfix][kafka, test] Remove unused statement from testOneToOneAtLeastOnce

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a75dec1caa6803c79d4b0713cf27c4c17c46a22
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Oct 18 16:28:31 2019 +0200

    [hotfix][kafka,test] Remove unused statement from testOneToOneAtLeastOnce
---
 .../apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index a971386..99a19d2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -277,7 +277,6 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 			});
 		}
 
-		FailingIdentityMapper.failedBefore = false;
 		try {
 			env.execute("One-to-one at least once test");
 			fail("Job should fail!");


[flink] 03/04: [hotfix][dist] Rename local variables in GlobalConfiguration

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ecb35198cb81c4d9cdb6b8a68698db8ca0ec034
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Oct 21 10:16:50 2019 +0200

    [hotfix][dist] Rename local variables in GlobalConfiguration
---
 .../apache/flink/configuration/GlobalConfiguration.java    | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index c0debab..be0ffa0 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -139,23 +139,23 @@ public final class GlobalConfiguration {
 	}
 
 	private static void enrichWithEnvironmentVariable(String environmentVariable, Configuration configuration) {
-		String pluginsDirFromEnv = System.getenv(environmentVariable);
+		String valueFromEnv = System.getenv(environmentVariable);
 
-		if (pluginsDirFromEnv == null) {
+		if (valueFromEnv == null) {
 			return;
 		}
 
-		String pluginsDirFromConfig = configuration.getString(environmentVariable, pluginsDirFromEnv);
+		String valueFromConfig = configuration.getString(environmentVariable, valueFromEnv);
 
-		if (!pluginsDirFromEnv.equals(pluginsDirFromConfig)) {
+		if (!valueFromEnv.equals(valueFromConfig)) {
 			throw new IllegalConfigurationException(
-				"The given configuration file already contains a value (" + pluginsDirFromEnv +
+				"The given configuration file already contains a value (" + valueFromEnv +
 					") for the key (" + environmentVariable +
-					") that would have been overwritten with (" + pluginsDirFromConfig +
+					") that would have been overwritten with (" + valueFromConfig +
 					") by an environment with the same name.");
 		}
 
-		configuration.setString(environmentVariable, pluginsDirFromEnv);
+		configuration.setString(environmentVariable, valueFromEnv);
 	}
 
 	/**


[flink] 04/04: [FLINK-14235][kafka, tests] Change source in at-least-once test from finite to infinite

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c31e44e5402c5fd7deb10b83534740ac7f66d0f8
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Oct 18 16:38:26 2019 +0200

    [FLINK-14235][kafka,tests] Change source in at-least-once test from finite to infinite
    
    Previously it was possible that the source would end before a first chcekpoint could complete.
    If that was the case, any exceptions thrown during checkpointing are swallowed, which could explain
    the apparent data loss from FLINK-14235.
---
 .../connectors/kafka/KafkaProducerTestBase.java      | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 99a19d2..10e4f55 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -255,7 +255,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 
 		// process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application
 		DataStream<Integer> inputStream = env
-			.fromCollection(getIntegersSequence(numElements))
+			.addSource(new InfiniteIntegerSource())
 			.map(new BrokerRestartingMapper<>(failAfterElements));
 
 		StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
@@ -538,4 +538,22 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink {
 		public void initializeState(FunctionInitializationContext context) throws Exception {
 		}
 	}
+
+	private static final class InfiniteIntegerSource implements SourceFunction<Integer> {
+
+		private volatile boolean running = true;
+		private int counter = 0;
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			while (running) {
+				ctx.collect(counter++);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
 }


[flink] 01/04: [hotfix][kafka, test] Drop unused field in FailingIdentityMapper

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ee433955575abed85a043e3b5a837028ed88ccd4
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Oct 18 16:27:32 2019 +0200

    [hotfix][kafka,test] Drop unused field in FailingIdentityMapper
---
 .../streaming/connectors/kafka/testutils/FailingIdentityMapper.java     | 2 --
 1 file changed, 2 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
index c25eefb..bd412c9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/FailingIdentityMapper.java
@@ -42,7 +42,6 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T, T> implements
 	private static final long serialVersionUID = 6334389850158707313L;
 
 	public static volatile boolean failedBefore;
-	public static volatile boolean hasBeenCheckpointedBeforeFailure;
 
 	private final int failCount;
 	private int numElementsTotal;
@@ -74,7 +73,6 @@ public class FailingIdentityMapper<T> extends RichMapFunction<T, T> implements
 			Thread.sleep(10);
 
 			if (failer && numElementsTotal >= failCount) {
-				hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
 				failedBefore = true;
 				throw new Exception("Artificial Test Failure");
 			}