You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by beyond1920 <gi...@git.apache.org> on 2016/09/21 09:40:26 UTC

[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...

GitHub user beyond1920 opened a pull request:

    https://github.com/apache/flink/pull/2524

    [FLINK-4653] [Client] Refactor JobClientActor to adapt to new Rpc framework and new cluster managerment

    There are following main changes: 
    1. Create a RpcEndpoint(temporary named JobInfoTracker) and RpcGateway(temporary named JobInfoTrackerGateway) to replace the old JobClientActor. 
    2. Change rpc message communication in JobClientActor to rpc method call to apply to the new rpc framework. 
    3. JobInfoTracker is responsible for waiting for the jobStateChange and jobResult util job complete. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/alibaba/flink jira-4653

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2524.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2524
    
----
commit 5959e94604142784341ba76fb06cccd32473676b
Author: beyond1920 <be...@126.com>
Date:   2016-09-21T06:00:14Z

    jobClient refactor
    
    Summary: jobClient refactor for apply to new cluster management
    
    Test Plan: junit
    
    Reviewers: #blink, kete.yangkt
    
    Differential Revision: http://phabricator.taobao.net/D5816

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 closed the pull request at:

    https://github.com/apache/flink/pull/2524


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2524
  
    @mxm , I rebase the PR already. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...

Posted by beyond1920 <gi...@git.apache.org>.
Github user beyond1920 commented on the issue:

    https://github.com/apache/flink/pull/2524
  
    @mxm , thanks for your review. I modified the content based on your advice:
    1. The changes could compile successfully now, sorry for the level mistake.
    2. AwaitJobResult method in JobClientUtils would retry upon TimeoutException utils JobInfoTracker seems to be dead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2524: [FLINK-4653] [Client] Refactor JobClientActor to a...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2524#discussion_r80441549
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobClient/JobClientUtils.java ---
    @@ -0,0 +1,257 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobClient;
    +
    +import akka.actor.ActorSystem;
    +import akka.actor.Address;
    +import akka.util.Timeout;
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.akka.ListeningBehaviour;
    +import org.apache.flink.runtime.blob.BlobCache;
    +import org.apache.flink.runtime.blob.BlobKey;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.client.JobRetrievalException;
    +import org.apache.flink.runtime.jobmaster.JobMasterGateway;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.util.NetUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.Option;
    +import scala.Some;
    +import scala.Tuple2;
    +import scala.concurrent.Await;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.IOException;
    +import java.net.InetAddress;
    +import java.net.InetSocketAddress;
    +import java.net.URL;
    +import java.net.URLClassLoader;
    +import java.util.List;
    +import java.util.concurrent.TimeoutException;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * JobClientUtils is a utility for client.
    + * It offers the following methods:
    + * <ul>
    + *     <li>{@link #startJobClientRpcService(Configuration)} Starts a rpc service for client</li>
    + *     <li>{@link #retrieveRunningJobResult(JobID, JobMasterGateway, RpcService, LeaderRetrievalService, boolean, FiniteDuration, Configuration)}
    + *          Attaches to a running Job using the JobID, and wait for its job result</li>
    + *     <li>{@link #awaitJobResult(JobInfoTracker, ClassLoader)} Awaits the result of the job execution which jobInfoTracker listen for</li>
    + *     <li>{@link #retrieveClassLoader(JobID, JobMasterGateway, Configuration)} Reconstructs the class loader by first requesting information about it at the JobMaster
    + *          and then downloading missing jar files</li>
    + * </ul>
    + */
    +public class JobClientUtils {
    +
    +	private static final Logger LOG = LoggerFactory.getLogger(JobClientUtils.class);
    +
    +
    +	/**
    +	 * Starts a rpc service for client
    +	 *
    +	 * @param config the flink configuration
    +	 * @return
    +	 * @throws IOException
    +	 */
    +	public static RpcService startJobClientRpcService(Configuration config)
    +		throws IOException
    +	{
    +		LOG.info("Starting JobClientUtils rpc service");
    +		Option<Tuple2<String, Object>> remoting = new Some<>(new Tuple2<String, Object>("", 0));
    +
    +		// start a remote actor system to listen on an arbitrary port
    +		ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
    +		Address address = system.provider().getDefaultAddress();
    +
    +		String hostAddress = address.host().isDefined() ?
    +			NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
    +			"(unknown)";
    +		int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
    +		LOG.info("Started JobClientUtils actor system at " + hostAddress + ':' + port);
    +
    +		Timeout timeout = new Timeout(AkkaUtils.getClientTimeout(config));
    +		return new AkkaRpcService(system, timeout);
    +	}
    +
    +	/**
    +	 * Attaches to a running Job using the JobID, and wait for its job result
    +	 *
    +	 * @param jobID                  id of job
    +	 * @param jobMasterGateway       gateway to the JobMaster
    +	 * @param rpcService
    +	 * @param leaderRetrievalService leader retriever service of jobMaster
    +	 * @param sysoutLogUpdates       whether status messages shall be printed to sysout
    +	 * @param timeout                register timeout
    +	 * @param configuration          the flink configuration
    +	 * @return
    +	 * @throws JobExecutionException
    +	 */
    +	public static JobExecutionResult retrieveRunningJobResult(
    +		JobID jobID,
    +		JobMasterGateway jobMasterGateway,
    +		RpcService rpcService,
    +		LeaderRetrievalService leaderRetrievalService,
    +		boolean sysoutLogUpdates,
    +		FiniteDuration timeout,
    +		Configuration configuration) throws JobExecutionException
    +	{
    +
    +		checkNotNull(jobID, "The jobID must not be null.");
    +		checkNotNull(jobMasterGateway, "The jobMasterGateway must not be null.");
    +		checkNotNull(rpcService, "The rpcService must not be null.");
    +		checkNotNull(leaderRetrievalService, "The leaderRetrievalService must not be null.");
    +		checkNotNull(timeout, "The timeout must not be null");
    +		checkNotNull(configuration, "The configuration must not be null");
    +
    +		JobInfoTracker jobInfoTracker = null;
    +		try {
    +			jobInfoTracker = new JobInfoTracker(rpcService, leaderRetrievalService, jobID, sysoutLogUpdates);
    +			jobInfoTracker.start();
    +			registerClientAtJobMaster(jobID, jobInfoTracker.getAddress(), jobMasterGateway, timeout);
    +			ClassLoader classLoader = retrieveClassLoader(jobID, jobMasterGateway, configuration);
    +			return awaitJobResult(jobInfoTracker, classLoader);
    +		} finally {
    +			if (jobInfoTracker != null) {
    +				jobInfoTracker.shutDown();
    +			}
    +		}
    +	}
    +
    +	/**
    +	 * Awaits the result of the job execution which jobInfoTracker listen for
    +	 *
    +	 * @param jobInfoTracker job info tracker
    +	 * @param classLoader    classloader to parse the job result
    +	 * @return
    +	 * @throws JobExecutionException
    +	 */
    +	public static JobExecutionResult awaitJobResult(JobInfoTracker jobInfoTracker,
    +		ClassLoader classLoader) throws JobExecutionException
    +	{
    +		try {
    +			while (true) {
    +				Future<JobExecutionResult> jobExecutionResultFuture = jobInfoTracker.getJobExecutionResult(classLoader);
    +				try {
    +					JobExecutionResult jobExecutionResult = Await.result(jobExecutionResultFuture, new Timeout(AkkaUtils.INF_TIMEOUT()).duration());
    +					return jobExecutionResult;
    +				} catch (TimeoutException e) {
    +					// ignore timeout exception, retry
    --- End diff --
    
    I'm not sure if that will cause an infinite loop in case of reoccurring connection timeouts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2524: [FLINK-4653] [Client] Refactor JobClientActor to adapt to...

Posted by mxm <gi...@git.apache.org>.
Github user mxm commented on the issue:

    https://github.com/apache/flink/pull/2524
  
    Thanks for the PR @beyond1920. We just rebased the flip-6 branch. Could you rebase the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---