You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2016/08/10 21:59:25 UTC

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2353

    [FLINK-4355] [cluster management] Implement TaskManager side of registration at ResourceManager.

    This implements the first part of the `TaskManager` (here temporarily called `TaskExecutor` to avoid name clashes) as designed in FLIP-6.
    
    Specifically, it accepts a leader retrieval service for the `ResourceManager` and triggers the registration at the ResourceManager.
    
    Tests are pending, this pull request is intended for early review.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink taskmanager_register

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2353
    
----
commit f1078b72f4b4b9d1549d55905f61fefd0663e9be
Author: Stephan Ewen <se...@apache.org>
Date:   2016-08-10T18:42:45Z

    [FLINK-4355] [cluster management] Implement TaskManager side of registration at ResourceManager.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

Posted by wenlong88 <gi...@git.apache.org>.
Github user wenlong88 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2353#discussion_r74533793
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
    +
    +import java.util.UUID;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +public class TaskExecutorToResourceManagerConnection {
    +
    +	private final TaskExecutor taskExecutor;
    +
    +	private final ResourceManagerGateway resourceManager;
    +
    +	private final UUID resourceManagerLeaderId;
    +
    +	private final String resourceManagerAddress;
    +
    +	public TaskExecutorToResourceManagerConnection(
    --- End diff --
    
    I think we can have a HARPCGateway extends HAService which can be reused for other components


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen closed the pull request at:

    https://github.com/apache/flink/pull/2353


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/2353
  
    Really good code @StephanEwen :-) I like it a lot that you've pulled out the registration logic from the `TaskExecutor`. This makes it much cleaner and easier to test.
    
    I'm actually wondering whether we can actually take it a step further. What I've noticed is that we have to pass the `TaskExecutor` to different places to have access to the its state and to run things in the main thread context. I'm not sure whether this is really necessary. 
    
    The registration information should be more or less constant and the main thing which has to happen in the main thread context is the setting of the connected gateway. So wouldn't it be possible that the `ResourceManagerRegistration` is something like a cancelable future which is instantiated with all necessary information. It performs the registration logic completely in the background and once it is completed we can create a `TaskExecutorToResourceManagerConnection` in the main thread context. Whenever we have to start a new registration, we cancel the future (if there is still another registration running).
    
    Pulling out the `TaskExecutor` from the registration process has the advantage that the components are even further separated and, thus, easier to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2353
  
    I like that idea.
    
    Getting the TaskExecutor out of the registration means that we cannot transmit the slot report immediately with each registration call (it only makes sense when each attempt grabs the latest slot report).
    
    That might not be too bad for the TaskExecutor, as it can always just eagerly transmit a slot report (such as when responding to a heartbeat).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2353#discussion_r74802315
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/ResourceManagerRegistration.java ---
    @@ -0,0 +1,252 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import akka.dispatch.OnFailure;
    +import akka.dispatch.OnSuccess;
    +
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
    +import org.apache.flink.runtime.rpc.resourcemanager.TaskExecutorRegistrationResponse;
    +
    +import org.slf4j.Logger;
    +
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +
    +/**
    + * This utility class handles the registration of the TaskExecutor at the ResourceManager.
    + * It implements the initial address resolution and the retries-with-backoff strategy.
    + * 
    + * <p>The registration process notifies its {@link TaskExecutorToResourceManagerConnection}
    + * upon successful registration. The registration can be canceled, for example when the
    + * ResourceManager that it tries to register at looses leader status.
    + * 
    + * <p>Implementation note: This class does not act like a single-threaded actor.
    + * It holds only constant state and passes all variable state via stack and closures.
    + */
    +public class ResourceManagerRegistration {
    +
    +	// ------------------------------------------------------------------------
    +	//  configuration constants
    +	// ------------------------------------------------------------------------
    +
    +	private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
    +
    +	private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
    +
    +	private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
    +
    +	private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
    +
    +	// ------------------------------------------------------------------------
    +
    +	private final Logger log;
    +
    +	private final TaskExecutorToResourceManagerConnection connection;
    +
    +	private final TaskExecutor taskExecutor;
    +
    +	private final String resourceManagerAddress;
    +
    +	private final UUID resourceManagerLeaderId;
    +
    +	private volatile boolean canceled;
    +
    +	public ResourceManagerRegistration(
    +			Logger log,
    +			TaskExecutorToResourceManagerConnection connection,
    +			TaskExecutor taskExecutor,
    +			String resourceManagerAddress,
    +			UUID resourceManagerLeaderId) {
    +
    +		this.log = checkNotNull(log);
    +		this.connection = checkNotNull(connection);
    +		this.taskExecutor = checkNotNull(taskExecutor);
    +		this.resourceManagerAddress = checkNotNull(resourceManagerAddress);
    +		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  cancellation
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * Cancels the registration procedure.
    +	 */
    +	public void cancel() {
    +		canceled = true;
    +	}
    +
    +	/**
    +	 * Checks if the registration was canceled.
    +	 * @return True if the registration was canceled, false otherwise.
    +	 */
    +	public boolean isCanceled() {
    +		return canceled;
    +	}
    +
    +	// ------------------------------------------------------------------------
    +	//  registration
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * This method resolved the ResourceManager address to a callable gateway and starts the
    +	 * registration after that.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public void resolveResourceManagerAndStartRegistration() {
    +		final RpcService rpcService = taskExecutor.getRpcService();
    +
    +		// trigger resolution of the resource manager address to a callable gateway
    +		Future<ResourceManagerGateway> resourceManagerFuture =
    +				rpcService.connect(resourceManagerAddress, ResourceManagerGateway.class);
    +
    +		// upon success, start the registration attempts
    +		resourceManagerFuture.onSuccess(new OnSuccess<ResourceManagerGateway>() {
    +			@Override
    +			public void onSuccess(ResourceManagerGateway result) {
    +				log.info("Resolved ResourceManager address, beginning registration");
    +				register(result, 1, INITIAL_REGISTRATION_TIMEOUT_MILLIS);
    +			}
    +		}, taskExecutor.getMainThreadExecutionContext());
    +
    +		// upon failure, retry, unless this is cancelled
    +		resourceManagerFuture.onFailure(new OnFailure() {
    +			@Override
    +			public void onFailure(Throwable failure) {
    +				if (!isCanceled()) {
    +					log.warn("Could not resolve ResourceManager address {}, retrying...", resourceManagerAddress);
    +					resolveResourceManagerAndStartRegistration();
    +				}
    +			}
    +		}, rpcService.getRpcExecutionContext());
    +	}
    +
    +	/**
    +	 * This method performs a registration attempt and triggers either a success notification or a retry,
    +	 * depending on the result.
    +	 */
    +	@SuppressWarnings("unchecked")
    +	private void register(final ResourceManagerGateway resourceManager, final int attempt, final long timeoutMillis) {
    +		// this needs to run in the TaskExecutor's main thread
    +		taskExecutor.validateRunsInMainThread();
    +
    +		// eager check for canceling to avoid some unnecessary work
    +		if (canceled) {
    +			return;
    +		}
    +
    +		log.info("Registration at ResourceManager attempt {} (timeout={}ms)", attempt, timeoutMillis);
    +
    +		FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS);
    +
    +		Future<TaskExecutorRegistrationResponse> registrationFuture = resourceManager.registerTaskExecutor(
    +				resourceManagerLeaderId, taskExecutor.getAddress(), taskExecutor.getResourceID(),
    +				taskExecutor.getCurrentSlotReport(), timeout);
    --- End diff --
    
    Do we have to get the current slot report for each registration attempt? Isn't this something which can be out of date anyway when it arrives at the RM? The heartbeat should consolidate the state in such a case, shouldn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2353#discussion_r74803435
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import org.apache.flink.runtime.instance.InstanceID;
    +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
    +import org.slf4j.Logger;
    +
    +import java.util.UUID;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.util.Preconditions.checkState;
    +
    +public class TaskExecutorToResourceManagerConnection {
    +
    +	private final Logger log;
    +
    +	private final TaskExecutor taskExecutor;
    +
    +	private final UUID resourceManagerLeaderId;
    +
    +	private final String resourceManagerAddress;
    +
    +	private ResourceManagerRegistration pendingRegistration;
    +
    +	private ResourceManagerGateway registeredResourceManager;
    +
    +	private InstanceID registrationId;
    +
    +	private volatile boolean closed;
    +
    +
    +	public TaskExecutorToResourceManagerConnection(
    +			Logger log,
    --- End diff --
    
    I'm not so sure whether passing a logger from a different context makes the logging statements easier or harder to understand. Usually I have troubles to locate the logging statements (and thus the state of the computation) when I see logging statements logged from a different (wider) scope than they actually happened.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2353#discussion_r74793706
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.taskexecutor;
    +
    +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
    +
    +import java.util.UUID;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +public class TaskExecutorToResourceManagerConnection {
    --- End diff --
    
    Agreed, let's factor it out as a common utility do that as part of the next component that needs it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2353
  
    I addressed the comments by @tillrohrmann and @wenlong88
      - turning the registration object into a reusable utility that can also be used for the JobMaster registration
      - I made the registration object behave more like a cancelable future. That way, it needs not use the TaskExecutor's main-thread-execution-context for its retries.
    
    This looks actually pretty nice now. Will merge this to allow other people to build on top of it. Will provide tests ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2353: [FLINK-4355] [cluster management] Implement TaskMa...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2353#discussion_r74800246
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/TaskExecutorRegistrationResponse.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc.resourcemanager;
    +
    +import org.apache.flink.runtime.instance.InstanceID;
    +
    +/**
    + * Base class for responses from the ResourceManager to a registration attempt by a
    + * TaskExecutor.
    + */
    +public abstract class TaskExecutorRegistrationResponse {
    +
    +	/**
    +	 * Successful registration.
    +	 */
    +	public static final class Success extends TaskExecutorRegistrationResponse {
    +
    +		private final InstanceID registrationId;
    +
    +		private final long heartbeatInterval;
    +
    +		public Success(InstanceID registrationId, long heartbeatInterval) {
    +			this.registrationId = registrationId;
    +			this.heartbeatInterval = heartbeatInterval;
    +		}
    +
    +		public InstanceID getRegistrationId() {
    +			return registrationId;
    +		}
    +
    +		public long getHeartbeatInterval() {
    +			return heartbeatInterval;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')';
    +		}
    +	}
    +
    +	// ----------------------------------------------------------------------------
    +
    +	/**
    +	 * A rejected (declined) registration.
    +	 */
    +	public static final class Decline extends TaskExecutorRegistrationResponse {
    +
    +		private final String reason;
    +
    +		public Decline(String reason) {
    +			this.reason = reason;
    +		}
    +
    +		public String getReason() {
    +			return reason;
    +		}
    +
    +		@Override
    +		public String toString() {
    +			return "TaskExecutorRegistrationDecline (" + reason + ')';
    +		}
    +	}
    +}
    +
    +
    +
    +
    +
    +
    +
    --- End diff --
    
    Many line breaks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2353
  
    Manually merged into the `flip-6` feature branch in https://github.com/apache/flink/commit/68addf39e0e5f9e1656818f923be362680ed93b0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---