You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/03/18 15:50:20 UTC

[flink] branch master updated (b83060d -> 405cf8f)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b83060d  [FLINK-16606][python] Throw exceptions for the data types which are not currently supported
     new cf7cc89  [FLINK-16373] Make JobManagerLeaderListener thread safe
     new 3cd1eab  [hotfix] Minor cleanups in JobLeaderService
     new 9a07945  [hotfix] Add JavaDoc to RegisteredRpcConnection#tryReconnect
     new b4c0f2d  [FLINK-16635] Remove pinned dependency for okio and okhttp from flink-metrics-influxdb
     new 405cf8f  [FLINK-16635] Bump influxdb-java version to 2.17

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-metrics/flink-metrics-influxdb/pom.xml       |  17 +--
 .../src/main/resources/META-INF/NOTICE             |  14 +--
 .../registration/RegisteredRpcConnection.java      |   7 ++
 .../runtime/taskexecutor/JobLeaderService.java     | 136 +++++++++++----------
 .../runtime/taskexecutor/JobLeaderServiceTest.java | 116 ++++++++++++++++++
 5 files changed, 204 insertions(+), 86 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobLeaderServiceTest.java


[flink] 01/05: [FLINK-16373] Make JobManagerLeaderListener thread safe

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf7cc89ac99554dc51da5cbffc3b76fe32ef5fea
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Mar 4 18:01:18 2020 +0100

    [FLINK-16373] Make JobManagerLeaderListener thread safe
    
    The JobManagerLeaderListener used by the JobLeaderService was not thread safe. Stopping the listener
    while notifying a new leader could lead to an IllegalStateException where the rpcConnection which
    was supposed to be started was concurrently closed by the stop call.
    
    This closes #11313.
---
 .../runtime/taskexecutor/JobLeaderService.java     | 130 +++++++++++----------
 .../runtime/taskexecutor/JobLeaderServiceTest.java | 116 ++++++++++++++++++
 2 files changed, 186 insertions(+), 60 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index eed6f11..f89585e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -40,9 +40,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -228,20 +231,26 @@ public class JobLeaderService {
 	/**
 	 * Leader listener which tries to establish a connection to a newly detected job leader.
 	 */
+	@ThreadSafe
 	private final class JobManagerLeaderListener implements LeaderRetrievalListener {
 
+		private final Object lock = new Object();
+
 		/** Job id identifying the job to look for a leader. */
 		private final JobID jobId;
 
 		/** Rpc connection to the job leader. */
-		private volatile RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+		@GuardedBy("lock")
+		private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+
+		/** Leader id of the current job leader. */
+		@GuardedBy("lock")
+		@Nullable
+		private JobMasterId currentJobMasterId;
 
 		/** State of the listener. */
 		private volatile boolean stopped;
 
-		/** Leader id of the current job leader. */
-		private volatile JobMasterId currentJobMasterId;
-
 		private JobManagerLeaderListener(JobID jobId) {
 			this.jobId = Preconditions.checkNotNull(jobId);
 
@@ -250,91 +259,92 @@ public class JobLeaderService {
 			currentJobMasterId = null;
 		}
 
+		private JobMasterId getCurrentJobMasterId() {
+			synchronized (lock) {
+				return currentJobMasterId;
+			}
+		}
+
 		public void stop() {
-			stopped = true;
+			synchronized (lock) {
+				if (!stopped) {
+					stopped = true;
 
-			if (rpcConnection != null) {
-				rpcConnection.close();
+					if (rpcConnection != null) {
+						rpcConnection.close();
+					}
+				}
 			}
 		}
 
 		public void reconnect() {
-			if (stopped) {
-				LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
-			} else {
-				final RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> currentRpcConnection = rpcConnection;
-
-				if (currentRpcConnection != null) {
-					if (currentRpcConnection.isConnected()) {
-
-						if (currentRpcConnection.tryReconnect()) {
-							// double check for concurrent stop operation
-							if (stopped) {
-								currentRpcConnection.close();
-							}
-						} else {
-							LOG.debug("Could not reconnect to the JobMaster {}.", currentRpcConnection.getTargetAddress());
-						}
+			synchronized (lock) {
+				if (stopped) {
+					LOG.debug("Cannot reconnect because the JobManagerLeaderListener has already been stopped.");
+				} else {
+					if (rpcConnection != null) {
+						Preconditions.checkState(
+							rpcConnection.tryReconnect(),
+							"Illegal concurrent modification of the JobManagerLeaderListener rpc connection.");
 					} else {
-						LOG.debug("Ongoing registration to JobMaster {}.", currentRpcConnection.getTargetAddress());
+						LOG.debug("Cannot reconnect to an unknown JobMaster.");
 					}
-				} else {
-					LOG.debug("Cannot reconnect to an unknown JobMaster.");
 				}
 			}
 		}
 
 		@Override
 		public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
-			if (stopped) {
-				LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
-					"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
-			} else {
-				final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
-
-				LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
-					jobId, leaderAddress, jobMasterId);
-
-				if (leaderAddress == null || leaderAddress.isEmpty()) {
-					// the leader lost leadership but there is no other leader yet.
-					if (rpcConnection != null) {
-						rpcConnection.close();
-					}
+			Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();
 
-					jobLeaderListener.jobManagerLostLeadership(jobId, currentJobMasterId);
-
-					currentJobMasterId = jobMasterId;
+			synchronized (lock) {
+				if (stopped) {
+					LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
+						"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
 				} else {
-					currentJobMasterId = jobMasterId;
+					final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
 
-					if (rpcConnection != null) {
-						// check if we are already trying to connect to this leader
-						if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) {
+					LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
+						jobId, leaderAddress, jobMasterId);
+
+					if (leaderAddress == null || leaderAddress.isEmpty()) {
+						// the leader lost leadership but there is no other leader yet.
+						if (rpcConnection != null) {
 							rpcConnection.close();
+						}
 
+						jobManagerLostLeadership = Optional.ofNullable(currentJobMasterId);
+						currentJobMasterId = jobMasterId;
+					} else {
+						currentJobMasterId = jobMasterId;
+
+						if (rpcConnection != null) {
+							// check if we are already trying to connect to this leader
+							if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) {
+								rpcConnection.close();
+
+								rpcConnection = new JobManagerRegisteredRpcConnection(
+									LOG,
+									leaderAddress,
+									jobMasterId,
+									rpcService.getExecutor());
+							}
+						} else {
 							rpcConnection = new JobManagerRegisteredRpcConnection(
 								LOG,
 								leaderAddress,
 								jobMasterId,
 								rpcService.getExecutor());
 						}
-					} else {
-						rpcConnection = new JobManagerRegisteredRpcConnection(
-							LOG,
-							leaderAddress,
-							jobMasterId,
-							rpcService.getExecutor());
-					}
 
-					// double check for a concurrent stop operation
-					if (stopped) {
-						rpcConnection.close();
-					} else {
 						LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId);
 						rpcConnection.start();
 					}
 				}
 			}
+
+			// send callbacks outside of the lock scope
+			jobManagerLostLeadership.ifPresent(oldJobMasterId -> jobLeaderListener.jobManagerLostLeadership(jobId, oldJobMasterId));
 		}
 
 		@Override
@@ -378,7 +388,7 @@ public class JobLeaderService {
 			@Override
 			protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
 				// filter out old registration attempts
-				if (Objects.equals(getTargetLeaderId(), currentJobMasterId)) {
+				if (Objects.equals(getTargetLeaderId(), getCurrentJobMasterId())) {
 					log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId);
 
 					jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), success);
@@ -390,7 +400,7 @@ public class JobLeaderService {
 			@Override
 			protected void onRegistrationFailure(Throwable failure) {
 				// filter out old registration attempts
-				if (Objects.equals(getTargetLeaderId(), currentJobMasterId)) {
+				if (Objects.equals(getTargetLeaderId(), getCurrentJobMasterId())) {
 					log.info("Failed to register at job  manager {} for job {}.", getTargetAddress(), jobId);
 					jobLeaderListener.handleError(failure);
 				} else {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobLeaderServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobLeaderServiceTest.java
new file mode 100644
index 0000000..51595e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/JobLeaderServiceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Tests for the {@link JobLeaderService}.
+ */
+public class JobLeaderServiceTest extends TestLogger {
+
+	@ClassRule
+	public static final TestingRpcServiceResource RPC_SERVICE_RESOURCE = new TestingRpcServiceResource();
+
+	/**
+	 * Tests that we can concurrently modify the JobLeaderService and complete the leader retrieval operation.
+	 * See FLINK-16373.
+	 */
+	@Test
+	public void handlesConcurrentJobAdditionsAndLeaderChanges() throws Exception {
+		final JobLeaderService jobLeaderService = new JobLeaderService(
+			new LocalTaskManagerLocation(),
+			RetryingRegistrationConfiguration.defaultConfiguration());
+
+		final TestingJobLeaderListener jobLeaderListener = new TestingJobLeaderListener();
+		final int numberOperations = 20;
+		final BlockingQueue<SettableLeaderRetrievalService> instantiatedLeaderRetrievalServices = new ArrayBlockingQueue<>(numberOperations);
+
+		final HighAvailabilityServices haServices = new TestingHighAvailabilityServicesBuilder()
+			.setJobMasterLeaderRetrieverFunction(
+				leaderForJobId -> {
+					final SettableLeaderRetrievalService leaderRetrievalService = new SettableLeaderRetrievalService();
+					instantiatedLeaderRetrievalServices.offer(leaderRetrievalService);
+					return leaderRetrievalService;
+				})
+			.build();
+
+		jobLeaderService.start(
+			"foobar",
+			RPC_SERVICE_RESOURCE.getTestingRpcService(),
+			haServices,
+			jobLeaderListener);
+
+		final CheckedThread addJobAction = new CheckedThread() {
+			@Override
+			public void go() throws Exception {
+				for (int i = 0; i < numberOperations; i++) {
+					final JobID jobId = JobID.generate();
+					jobLeaderService.addJob(jobId, "foobar");
+					Thread.yield();
+					jobLeaderService.removeJob(jobId);
+				}
+			}
+		};
+		addJobAction.start();
+
+		for (int i = 0; i < numberOperations; i++) {
+			final SettableLeaderRetrievalService leaderRetrievalService = instantiatedLeaderRetrievalServices.take();
+			leaderRetrievalService.notifyListener("foobar", UUID.randomUUID());
+		}
+
+		addJobAction.sync();
+	}
+
+	private static final class TestingJobLeaderListener implements JobLeaderListener {
+
+		@Override
+		public void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage) {
+			// ignored
+		}
+
+		@Override
+		public void jobManagerLostLeadership(JobID jobId, JobMasterId jobMasterId) {
+			// ignored
+		}
+
+		@Override
+		public void handleError(Throwable throwable) {
+			// ignored
+		}
+	}
+}


[flink] 03/05: [hotfix] Add JavaDoc to RegisteredRpcConnection#tryReconnect

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a0794528c98612f4dfcc723529419fdb992918f
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Mar 17 15:08:07 2020 +0100

    [hotfix] Add JavaDoc to RegisteredRpcConnection#tryReconnect
---
 .../apache/flink/runtime/registration/RegisteredRpcConnection.java | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index 8c78ede..b257e17 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -100,6 +100,13 @@ public abstract class RegisteredRpcConnection<F extends Serializable, G extends
 		}
 	}
 
+	/**
+	 * Tries to reconnect to the {@link #targetAddress} by cancelling the pending registration
+	 * and starting a new pending registration.
+	 *
+	 * @return {@code false} if the connection has been closed or a concurrent modification has happened;
+	 * otherwise {@code true}
+	 */
 	public boolean tryReconnect() {
 		checkState(isConnected(), "Cannot reconnect to an unknown destination.");
 


[flink] 02/05: [hotfix] Minor cleanups in JobLeaderService

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3cd1eabee8b23bc903d5dbf0b04df39d4e634c50
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Mar 4 18:08:13 2020 +0100

    [hotfix] Minor cleanups in JobLeaderService
---
 .../org/apache/flink/runtime/taskexecutor/JobLeaderService.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index f89585e..b444cd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -294,7 +294,7 @@ public class JobLeaderService {
 		}
 
 		@Override
-		public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
+		public void notifyLeaderAddress(@Nullable final String leaderAddress, @Nullable final UUID leaderId) {
 			Optional<JobMasterId> jobManagerLostLeadership = Optional.empty();
 
 			synchronized (lock) {
@@ -446,8 +446,8 @@ public class JobLeaderService {
 		@Override
 		protected CompletableFuture<RegistrationResponse> invokeRegistration(
 				JobMasterGateway gateway,
-				JobMasterId jobMasterId,
-				long timeoutMillis) throws Exception {
+				JobMasterId fencingToken,
+				long timeoutMillis) {
 			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, Time.milliseconds(timeoutMillis));
 		}
 	}


[flink] 05/05: [FLINK-16635] Bump influxdb-java version to 2.17

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 405cf8f429c6a5031c21597abe9193bedcb8e15b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Mar 17 14:38:09 2020 +0100

    [FLINK-16635] Bump influxdb-java version to 2.17
    
    This commit bumps influxdb-java version from 2.16 to 2.17. This resolves a
    dependency convergence problem within the influxdb-java dependency.
    
    This closes #11428.
---
 flink-metrics/flink-metrics-influxdb/pom.xml                 |  2 +-
 .../src/main/resources/META-INF/NOTICE                       | 12 ++++++------
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/flink-metrics/flink-metrics-influxdb/pom.xml b/flink-metrics/flink-metrics-influxdb/pom.xml
index 1d6289a..71cfdbf 100644
--- a/flink-metrics/flink-metrics-influxdb/pom.xml
+++ b/flink-metrics/flink-metrics-influxdb/pom.xml
@@ -57,7 +57,7 @@ under the License.
 		<dependency>
 			<groupId>org.influxdb</groupId>
 			<artifactId>influxdb-java</artifactId>
-			<version>2.16</version>
+			<version>2.17</version>
 		</dependency>
 
 		<!-- test dependencies -->
diff --git a/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE b/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE
index 2cca565..cfd39b4 100644
--- a/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE
+++ b/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE
@@ -6,13 +6,13 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- com.squareup.moshi:moshi:1.5.0
-- com.squareup.okhttp3:logging-interceptor:3.11.0
-- com.squareup.okhttp3:okhttp:3.14.3
+- com.squareup.moshi:moshi:1.8.0
+- com.squareup.okhttp3:logging-interceptor:3.14.4
+- com.squareup.okhttp3:okhttp:3.14.4
 - com.squareup.okio:okio:1.17.2
-- com.squareup.retrofit2:converter-moshi:2.4.0
-- com.squareup.retrofit2:retrofit:2.4.0
+- com.squareup.retrofit2:converter-moshi:2.6.2
+- com.squareup.retrofit2:retrofit:2.6.2
 
 This project bundles the following dependencies under the MIT license. (https://opensource.org/licenses/MIT)
 
-- org.influxdb:influxdb-java:2.14
+- org.influxdb:influxdb-java:2.17


[flink] 04/05: [FLINK-16635] Remove pinned dependency for okio and okhttp from flink-metrics-influxdb

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b4c0f2db5ecc8a0677763c1f822d5fdc46a52597
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Mar 17 12:11:43 2020 +0100

    [FLINK-16635] Remove pinned dependency for okio and okhttp from flink-metrics-influxdb
    
    With FLINK-12147 we bumped the influxdb-java version from 2.14 to 2.16. At the same
    time we still have okio and okhttp fixed to an incompatible version. This commit
    removes the dependency management entries for these dependencies so that the
    influxdb reporter bundles the correct dependencies.
---
 flink-metrics/flink-metrics-influxdb/pom.xml              | 15 ---------------
 .../src/main/resources/META-INF/NOTICE                    |  4 ++--
 2 files changed, 2 insertions(+), 17 deletions(-)

diff --git a/flink-metrics/flink-metrics-influxdb/pom.xml b/flink-metrics/flink-metrics-influxdb/pom.xml
index 81d90cd..1d6289a 100644
--- a/flink-metrics/flink-metrics-influxdb/pom.xml
+++ b/flink-metrics/flink-metrics-influxdb/pom.xml
@@ -89,21 +89,6 @@ under the License.
 		</dependency>
 	</dependencies>
 
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>com.squareup.okhttp3</groupId>
-				<artifactId>okhttp</artifactId>
-				<version>3.11.0</version>
-			</dependency>
-			<dependency>
-				<groupId>com.squareup.okio</groupId>
-				<artifactId>okio</artifactId>
-				<version>1.14.0</version>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-
 	<build>
 		<plugins>
 			<plugin>
diff --git a/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE b/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE
index ad79869..2cca565 100644
--- a/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE
+++ b/flink-metrics/flink-metrics-influxdb/src/main/resources/META-INF/NOTICE
@@ -8,8 +8,8 @@ This project bundles the following dependencies under the Apache Software Licens
 
 - com.squareup.moshi:moshi:1.5.0
 - com.squareup.okhttp3:logging-interceptor:3.11.0
-- com.squareup.okhttp3:okhttp:3.11.0
-- com.squareup.okio:okio:1.14.0
+- com.squareup.okhttp3:okhttp:3.14.3
+- com.squareup.okio:okio:1.17.2
 - com.squareup.retrofit2:converter-moshi:2.4.0
 - com.squareup.retrofit2:retrofit:2.4.0