You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/02 12:35:56 UTC

[flink] branch release-1.11 updated (33f8524 -> 4ff98f4)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 33f8524  [FLINK-17996][table-planner-blink] Fix NPE in CatalogTableStatisticsConverter.convertToColumnStats method
     new 87afb9e  [FLINK-18035][runtime] Use fixed thread pool
     new 752828c  Revert "[FLINK-17558][runtime] Add Executors#newCachedThreadPool"
     new 4ee5470  [FLINK-18008][runtime] HistoryServer logs environment info
     new 4ff98f4  [FLINK-18010][runtime] Expand HistoryServer logging

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/webmonitor/history/HistoryServer.java  |  3 ++
 .../history/HistoryServerArchiveFetcher.java       | 11 +++-
 .../apache/flink/runtime/concurrent/Executors.java | 27 ----------
 .../runtime/taskexecutor/TaskManagerRunner.java    |  4 +-
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ----------------------
 5 files changed, 15 insertions(+), 93 deletions(-)
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java


[flink] 01/04: [FLINK-18035][runtime] Use fixed thread pool

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87afb9e08b7a3fb846eaa406a139646096450f50
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 29 14:11:47 2020 +0200

    [FLINK-18035][runtime] Use fixed thread pool
---
 .../java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 9772700..82d2451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -30,7 +30,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
@@ -74,6 +73,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -367,7 +367,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
 			resourceID,
 			taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
 
-		final ExecutorService ioExecutor = Executors.newCachedThreadPool(
+		final ExecutorService ioExecutor = Executors.newFixedThreadPool(
 			taskManagerServicesConfiguration.getNumIoThreads(),
 			new ExecutorThreadFactory("flink-taskexecutor-io"));
 


[flink] 04/04: [FLINK-18010][runtime] Expand HistoryServer logging

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ff98f44f9344c0fb6529ad99a250237d4cd85c9
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 28 15:37:26 2020 +0200

    [FLINK-18010][runtime] Expand HistoryServer logging
---
 .../webmonitor/history/HistoryServerArchiveFetcher.java       | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index fd6069a..5a41173 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -186,11 +186,13 @@ class HistoryServerArchiveFetcher {
 		@Override
 		public void run() {
 			try {
+				LOG.debug("Starting archive fetching.");
 				List<ArchiveEvent> events = new ArrayList<>();
 				Set<String> jobsToRemove = new HashSet<>(cachedArchives);
 				for (HistoryServer.RefreshLocation refreshLocation : refreshDirs) {
 					Path refreshDir = refreshLocation.getPath();
 					FileSystem refreshFS = refreshLocation.getFs();
+					LOG.debug("Checking archive directory {}.", refreshDir);
 
 					// contents of /:refreshDir
 					FileStatus[] jobArchives;
@@ -214,7 +216,11 @@ class HistoryServerArchiveFetcher {
 							continue;
 						}
 						jobsToRemove.remove(jobID);
-						if (!cachedArchives.contains(jobID)) {
+
+						if (cachedArchives.contains(jobID)) {
+							LOG.trace("Ignoring archive {} because it was already fetched.", jobArchivePath);
+						} else {
+							LOG.info("Processing archive {}.", jobArchivePath);
 							try {
 								for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
 									String path = archive.getPath();
@@ -224,6 +230,7 @@ class HistoryServerArchiveFetcher {
 									if (path.equals(JobsOverviewHeaders.URL)) {
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else if (path.equals("/joboverview")) { // legacy path
+										LOG.debug("Migrating legacy archive {}", jobArchivePath);
 										json = convertLegacyJobOverview(json);
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
@@ -253,6 +260,7 @@ class HistoryServerArchiveFetcher {
 								}
 								events.add(new ArchiveEvent(jobID, ArchiveEventType.CREATED));
 								cachedArchives.add(jobID);
+								LOG.info("Processing archive {} finished.", jobArchivePath);
 							} catch (IOException e) {
 								LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
 								// Make sure we do not include this job in the overview
@@ -281,6 +289,7 @@ class HistoryServerArchiveFetcher {
 					updateJobOverview(webOverviewDir, webDir);
 				}
 				events.forEach(jobArchiveEventListener::accept);
+				LOG.debug("Finished archive fetching.");
 			} catch (Exception e) {
 				LOG.error("Critical failure while fetching/processing job archives.", e);
 			}


[flink] 02/04: Revert "[FLINK-17558][runtime] Add Executors#newCachedThreadPool"

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 752828c07e7deb58fea2a5a8969758901dcf6c87
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri May 29 14:12:08 2020 +0200

    Revert "[FLINK-17558][runtime] Add Executors#newCachedThreadPool"
    
    This reverts commit 90b8455d08eda7a6a55f5cc952fa1adf3a48ff96.
---
 .../apache/flink/runtime/concurrent/Executors.java | 27 ----------
 .../flink/runtime/concurrent/ExecutorsTest.java    | 63 ----------------------
 2 files changed, 90 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index c758752..41d9a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -18,14 +18,8 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import scala.concurrent.ExecutionContext;
 
@@ -67,27 +61,6 @@ public class Executors {
 	}
 
 	/**
-	 * Returns a new cached thread pool with the desired maximum size.
-	 *
-	 * <p>This method is a variation of {@link java.util.concurrent.Executors#newFixedThreadPool(int, ThreadFactory)},
-	 * with the minimum pool size set to 0.
-	 * In that respect it is similar to {@link java.util.concurrent.Executors#newCachedThreadPool()}, but it uses a
-	 * {@link LinkedBlockingQueue} instead to allow tasks to be queued, instead of failing with an exception if the pool
-	 * is saturated.
-	 *
-	 * @see ExecutorThreadFactory
-	 * @param maxPoolSize maximum size of the thread pool
-	 * @param threadFactory thread factory to use
-	 * @return new cached thread pool
-	 */
-	public static ExecutorService newCachedThreadPool(int maxPoolSize, ThreadFactory threadFactory) {
-		return new ThreadPoolExecutor(0, maxPoolSize,
-			60L, TimeUnit.SECONDS,
-			new LinkedBlockingQueue<>(),
-			threadFactory);
-	}
-
-	/**
 	 * Direct execution context.
 	 */
 	private static class DirectExecutionContext implements ExecutionContext {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
deleted file mode 100644
index e3be776..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ExecutorsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-import org.apache.flink.core.testutils.BlockerSync;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.testutils.executor.TestExecutorResource;
-
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadFactory;
-
-/**
- * Tests for {@link Executors}.
- */
-public class ExecutorsTest {
-
-	@Rule
-	public final TestExecutorResource executorResource = new TestExecutorResource(
-		() -> Executors.newCachedThreadPool(1, new ExecutorThreadFactory()));
-
-	/**
-	 * Tests that the {@link ExecutorService} returned by {@link Executors#newCachedThreadPool(int, ThreadFactory)}
-	 * allows tasks to be queued. In a prior implementation the executor used a synchronous queue, rejecting tasks with
-	 * an exception if no thread was available to process it.
-	 */
-	@Test
-	public void testNewCachedThreadPoolDoesNotRejectTasksExceedingActiveThreadCount() throws InterruptedException {
-		Executor executor = executorResource.getExecutor();
-
-		BlockerSync sync = new BlockerSync();
-		try {
-			// submit the first blocking task, which should block the single pool thread
-			executor.execute(sync::blockNonInterruptible);
-
-			// the thread is now blocked
-			sync.awaitBlocker();
-
-			// this task should not be rejected
-			executor.execute(sync::blockNonInterruptible);
-		} finally {
-			sync.releaseBlocker();
-		}
-	}
-}


[flink] 03/04: [FLINK-18008][runtime] HistoryServer logs environment info

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ee5470344dd30ea8135afc86c7df2fbb5b08159
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu May 28 15:10:28 2020 +0200

    [FLINK-18008][runtime] HistoryServer logs environment info
---
 .../org/apache/flink/runtime/webmonitor/history/HistoryServer.java     | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 3dd2d47..dda88ca 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.router.Router;
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.ExceptionUtils;
@@ -104,6 +105,8 @@ public class HistoryServer {
 	private final Thread shutdownHook;
 
 	public static void main(String[] args) throws Exception {
+		EnvironmentInformation.logEnvironmentInfo(LOG, "HistoryServer", args);
+
 		ParameterTool pt = ParameterTool.fromArgs(args);
 		String configDir = pt.getRequired("configDir");