You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/02/05 19:17:13 UTC

[flink] branch master updated (7d06b11 -> e12512f)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7d06b11  [FLINK-15811][task] report CancelTaskException on SourceStreamTask cancellation
     new f6f5370  [hotfix][docs] Improve description of 'high-availability.jobmanager.port' config option.
     new d8f0418  [hotfix][runtime] Clean up minor issues in TaskInformation
     new 5727e88  [FLINK-15920][build] Show thread names in logs by default
     new e12512f  [hotfix][runtime] Small improvements in log messages for Task and RocksDB Backend

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../generated/expert_high_availability_section.html      |  2 +-
 .../generated/high_availability_configuration.html       |  2 +-
 .../flink/configuration/HighAvailabilityOptions.java     |  8 +++++++-
 .../flink/runtime/executiongraph/TaskInformation.java    | 16 ++++++++--------
 .../apache/flink/runtime/taskexecutor/TaskExecutor.java  |  2 +-
 .../streaming/state/RocksDBKeyedStateBackend.java        |  4 ++--
 tools/log4j-travis.properties                            |  4 ++--
 7 files changed, 22 insertions(+), 16 deletions(-)


[flink] 02/04: [hotfix][runtime] Clean up minor issues in TaskInformation

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8f041829f52869dc408030ab3456719f01626c9
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 12:46:43 2020 +0100

    [hotfix][runtime] Clean up minor issues in TaskInformation
    
      - Checkstyle violations in JavaDocs
      - Remove use of 'checkNotNull' on primitive types, which looks like a confusion/oversight.
        It is not checking anything and results in unnecessary boxing/unboxing.
---
 .../flink/runtime/executiongraph/TaskInformation.java    | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
index 9dc9a04..6410e6b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
@@ -32,22 +32,22 @@ public class TaskInformation implements Serializable {
 
 	private static final long serialVersionUID = -9006218793155953789L;
 
-	/** Job vertex id of the associated job vertex */
+	/** Job vertex id of the associated job vertex. */
 	private final JobVertexID jobVertexId;
 
-	/** Name of the task */
+	/** Name of the task. */
 	private final String taskName;
 
-	/** The number of subtasks for this operator */
+	/** The number of subtasks for this operator. */
 	private final int numberOfSubtasks;
 
-	/** The maximum parallelism == number of key groups */
+	/** The maximum parallelism == number of key groups. */
 	private final int maxNumberOfSubtasks;
 
-	/** Class name of the invokable to run */
+	/** Class name of the invokable to run. */
 	private final String invokableClassName;
 
-	/** Configuration for the task */
+	/** Configuration for the task. */
 	private final Configuration taskConfiguration;
 
 	public TaskInformation(
@@ -59,8 +59,8 @@ public class TaskInformation implements Serializable {
 			Configuration taskConfiguration) {
 		this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
 		this.taskName = Preconditions.checkNotNull(taskName);
-		this.numberOfSubtasks = Preconditions.checkNotNull(numberOfSubtasks);
-		this.maxNumberOfSubtasks = Preconditions.checkNotNull(maxNumberOfSubtasks);
+		this.numberOfSubtasks = numberOfSubtasks;
+		this.maxNumberOfSubtasks = maxNumberOfSubtasks;
 		this.invokableClassName = Preconditions.checkNotNull(invokableClassName);
 		this.taskConfiguration = Preconditions.checkNotNull(taskConfiguration);
 	}


[flink] 03/04: [FLINK-15920][build] Show thread names in logs by default

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5727e889cf3326bc9d61596ced1d84d62c33b2fc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 13:12:00 2020 +0100

    [FLINK-15920][build] Show thread names in logs by default
    
    This closes #11023
---
 tools/log4j-travis.properties | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/tools/log4j-travis.properties b/tools/log4j-travis.properties
index 1095a3f..b39af9a 100644
--- a/tools/log4j-travis.properties
+++ b/tools/log4j-travis.properties
@@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file
 # -----------------------------------------------------------------------------
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n
 
 # -----------------------------------------------------------------------------
 # File (use 'file')
@@ -32,7 +32,7 @@ log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.dir}/mvn-${mvn.forkNumber}.log
 log4j.appender.file.append=true
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
+log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%20t] %-5p %-60c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR


[flink] 01/04: [hotfix][docs] Improve description of 'high-availability.jobmanager.port' config option.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6f5370532c9a0e3ec1de9a0d73ac3f58669bfe6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 12:43:45 2020 +0100

    [hotfix][docs] Improve description of 'high-availability.jobmanager.port' config option.
---
 docs/_includes/generated/expert_high_availability_section.html    | 2 +-
 docs/_includes/generated/high_availability_configuration.html     | 2 +-
 .../org/apache/flink/configuration/HighAvailabilityOptions.java   | 8 +++++++-
 3 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/docs/_includes/generated/expert_high_availability_section.html b/docs/_includes/generated/expert_high_availability_section.html
index 6ddbeb5..4a571e7 100644
--- a/docs/_includes/generated/expert_high_availability_section.html
+++ b/docs/_includes/generated/expert_high_availability_section.html
@@ -12,7 +12,7 @@
             <td><h5>high-availability.jobmanager.port</h5></td>
             <td style="word-wrap: break-word;">"0"</td>
             <td>String</td>
-            <td>Optional port (range) used by the job manager in high-availability mode.</td>
+            <td>The port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of 'jobmanager.rpc.port'.A value of '0' means that a random free port is chosen. TaskManagers discover this port through the high-availability services (leader election), so a random port or a port range works without requiring any additional means of service discovery.</td>
         </tr>
     </tbody>
 </table>
diff --git a/docs/_includes/generated/high_availability_configuration.html b/docs/_includes/generated/high_availability_configuration.html
index 86573d9..cf61274 100644
--- a/docs/_includes/generated/high_availability_configuration.html
+++ b/docs/_includes/generated/high_availability_configuration.html
@@ -24,7 +24,7 @@
             <td><h5>high-availability.jobmanager.port</h5></td>
             <td style="word-wrap: break-word;">"0"</td>
             <td>String</td>
-            <td>Optional port (range) used by the job manager in high-availability mode.</td>
+            <td>The port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of 'jobmanager.rpc.port'.A value of '0' means that a random free port is chosen. TaskManagers discover this port through the high-availability services (leader election), so a random port or a port range works without requiring any additional means of service discovery.</td>
         </tr>
         <tr>
             <td><h5>high-availability.storageDir</h5></td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
index 8930d00..86b8232 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/HighAvailabilityOptions.java
@@ -79,9 +79,15 @@ public class HighAvailabilityOptions {
 	@Documentation.Section(Documentation.Sections.EXPERT_HIGH_AVAILABILITY)
 	public static final ConfigOption<String> HA_JOB_MANAGER_PORT_RANGE =
 			key("high-availability.jobmanager.port")
+			.stringType()
 			.defaultValue("0")
 			.withDeprecatedKeys("recovery.jobmanager.port")
-			.withDescription("Optional port (range) used by the job manager in high-availability mode.");
+			.withDescription(
+					"The port (range) used by the Flink Master for its RPC connections in highly-available setups. " +
+					"In highly-available setups, this value is used instead of '" + JobManagerOptions.PORT.key() + "'." +
+					"A value of '0' means that a random free port is chosen. TaskManagers discover this port through " +
+					"the high-availability services (leader election), so a random port or a port range works " +
+					"without requiring any additional means of service discovery.");
 
 	// ------------------------------------------------------------------------
 	//  ZooKeeper Options


[flink] 04/04: [hotfix][runtime] Small improvements in log messages for Task and RocksDB Backend

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e12512f21b50584ef2c2ce9a25de3b3a8b47e149
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Feb 5 15:26:18 2020 +0100

    [hotfix][runtime] Small improvements in log messages for Task and RocksDB Backend
---
 .../main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 2 +-
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java       | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 6de6747..67acbd8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1451,7 +1451,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			}
 
 			log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.",
-				task.getExecutionState(), task.getTaskInfo().getTaskName(), task.getExecutionId());
+				task.getExecutionState(), task.getTaskInfo().getTaskNameWithSubtasks(), task.getExecutionId());
 
 			AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot();
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6f27f11..2ddb79b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -381,12 +381,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	private void cleanInstanceBasePath() {
-		LOG.info("Deleting existing instance base directory {}.", instanceBasePath);
+		LOG.info("Closed RocksDB State Backend. Cleaning up RocksDB working directory {}.", instanceBasePath);
 
 		try {
 			FileUtils.deleteDirectory(instanceBasePath);
 		} catch (IOException ex) {
-			LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex);
+			LOG.warn("Could not delete RocksDB working directory: {}", instanceBasePath, ex);
 		}
 	}