You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by bastien dine <ba...@gmail.com> on 2018/11/23 15:32:09 UTC
Multiple env.execute() into one Flink batch job
Hello,
I need to chain processing in DataSet API, so I am launching severals jobs,
with multiple env.execute() :
topology1.define();
env.execute;
topogy2.define();
env.execute;
This is working fine when I am running it within IntellIiJ
But when I am deploying it into my cluster, it only launch the first
topology..
Could you please shed some light on this issue?
Regards,
Bastien
Re: Multiple env.execute() into one Flink batch job
Posted by Flavio Pompermaier <po...@okkam.it>.
We solved this issue (of read the value of an accumulator) by calling a
REST endpoint after the job end, in order to store the value associated to
the accumulator in some database.
This is very awful but I didn't find any better solution..
This is the code that runs the job (of course its not complete but it could
help you to get some insight):
You need to import the following Java lib first:
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId>
<version>2.1.0</version>
</dependency>
-------------------- FlinkSshJobRun.java
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.common.config.keys.FilePasswordProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlinkSshJobRun implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkSshJobRun.class);
private final Integer sshPort;
private final String sshUser;
private final String sshKeyPassword;
private final String command;
private final Integer sshTimeoutMillis;
private final String targetHost;
private final Charset encoding;
public FlinkSshJobRun(FlinkProperties flinkProps,
String jarFile, String entryClass, String argsStr, Integer
parallelism)
throws MalformedURLException {
final URL jobManagerUrl = new URL(flinkProps.getJobManagerUrl());
this.targetHost = jobManagerUrl.getHost();
this.sshUser = flinkProps.getSsh().getUser();
this.sshPort = flinkProps.getSsh().getPort();
this.sshKeyPassword = flinkProps.getSsh().getKeyPassword();
this.sshTimeoutMillis = flinkProps.getSsh().getTimeoutMs();
String flinkBinClient = flinkProps.getSsh().getFlinkDistDir() +
"/bin/flink";
if (parallelism != null) {
flinkBinClient += " -p " + parallelism;
}
final String flinkUploadDir =
flinkProps.getSsh().getFlinkJarUploadDir();
final Path jarPathOnServer = Paths.get(flinkUploadDir, jarFile);//
this.command = flinkBinClient + " run -c " + entryClass //
+ " " + jarPathOnServer //
+ " " + (argsStr == null ? "" : argsStr);
this.encoding = Charset.forName(StandardCharsets.UTF_8.name());
}
@Override
public void run() {
ClientSession session = null;
final ByteArrayOutputStream stdErr = new ByteArrayOutputStream();
final ByteArrayOutputStream stdOut = new ByteArrayOutputStream();
try (SshClient cl = SshClient.setUpDefaultClient();) {
cl.setFilePasswordProvider(FilePasswordProvider.EMPTY);
if (sshKeyPassword != null) {
cl.setFilePasswordProvider(file -> sshKeyPassword);
}
cl.start();
session = cl.connect(sshUser, targetHost, sshPort)//
.verify(sshTimeoutMillis)//
.getSession();
session.auth().verify(Math.multiplyExact(sshTimeoutMillis, 4));
LOG.info("Executing SSH: {}@{}:{} -> {}", sshUser, targetHost,
sshPort, command);
session.executeRemoteCommand(command, stdOut, stdErr, encoding);
LOG.info("SSH successfully executed {}@{}:{} -> {}", sshUser,
targetHost, sshPort, command);
final String stdOutTxt = getOsContentOnException(true, stdOut);
LOG.info("SSH stdout for {}@{}:{} -> {}\n{}", sshUser, targetHost,
sshPort, command,
stdOutTxt);
} catch (IOException ex) {
final String stdOutTxt = getOsContentOnException(true, stdOut);
final String stdErrTxt = getOsContentOnException(false, stdErr);
final String errorMsg = String.format(
"Error during SSH execution %s@%s:%s ->
%s%nSTDOUT:%s%n%nSTDERR:%s", sshUser, targetHost,
sshPort, command, stdOutTxt, stdErrTxt);
LOG.error(errorMsg, ex);
} finally {
try {
stdErr.close();
} catch (IOException ex) {
LOG.error("Error during STDERR buffer close", ex);
}
try {
stdOut.close();
} catch (IOException ex) {
LOG.error("Error during STDOUT buffer close", ex);
}
if (session != null) {
try {
session.close();
} catch (IOException ex) {
LOG.error("Error during SSH session close", ex);
}
}
}
}
private String getOsContentOnException(boolean stdOut, final
ByteArrayOutputStream os) {
final String targetOutStream = stdOut ? "STDOUT" : "STDERR";
String ret = String.format("<Error while retrieving %s>",
targetOutStream);
try {
ret = os.toString(StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException ex2) {
LOG.error("Error while reading " + targetOutStream + " after
exception", ex2);
}
return ret;
}
}
-------------------FlinkProperties.java
package it.okkam.datalinks.job.api.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "flink", ignoreUnknownFields = true)
public class FlinkProperties {
private String jobManagerUrl;
private final Ssh ssh = new Ssh();
public String getJobManagerUrl() {
return jobManagerUrl;
}
public void setJobManagerUrl(String jobManagerUrl) {
this.jobManagerUrl = jobManagerUrl;
}
public Ssh getSsh() {
return ssh;
}
public static class Async {
private Integer corePoolSize;
private Integer maxPoolSize;
private Integer queueCapacity;
public Integer getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(final Integer corePoolSize) {
this.corePoolSize = corePoolSize;
}
public Integer getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(final Integer maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public Integer getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(final Integer queueCapacity) {
this.queueCapacity = queueCapacity;
}
}
public static class Ssh {
private Integer port;
private Integer timeoutMs;
private String user;
private String keyPassword;
private String flinkDistDir;
private String flinkJarUploadDir;
private final Async async = new Async();
public Async getAsync() {
return async;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public Integer getTimeoutMs() {
return timeoutMs;
}
public void setTimeoutMs(Integer timeoutMs) {
this.timeoutMs = timeoutMs;
}
public String getFlinkDistDir() {
return flinkDistDir;
}
public void setFlinkDistDir(String flinkDistDir) {
this.flinkDistDir = flinkDistDir;
}
public String getFlinkJarUploadDir() {
return flinkJarUploadDir;
}
public void setFlinkJarUploadDir(String flinkJarUploadDir) {
this.flinkJarUploadDir = flinkJarUploadDir;
}
public String getKeyPassword() {
return keyPassword;
}
public void setKeyPassword(String keyPassword) {
this.keyPassword = keyPassword;
}
}
}
------------------- Rest service (via Spring boot)
private TaskExecutor taskExecutor;
@Override
@GetMapping(METHOD_JOB_RUN + "/{" + PARAMS_JARFILE + ":.+}")
@ResponseStatus(HttpStatus.OK)
public JobRun runJob(//
@PathVariable(name = PARAMS_JARFILE) String jarFile, //
@RequestParam(name = "entry-class", required = false) String
entryClass,
@RequestParam(name = "program-args", required = false) String argsStr,
@RequestParam(name = "parallelism", required = false) Integer
parallelism)
throws IOException {
final long start = System.currentTimeMillis();
final String jobId = "OK"; //it could be set if Flink REST API would
work as expected..
taskExecutor.execute(new FlinkSshJobRun(flinkProperties, jarFile,
entryClass, argsStr, parallelism));
final long elapsed = System.currentTimeMillis() - start;
return new JobRun(elapsed, null, jobId);
}
On Fri, Nov 23, 2018 at 4:47 PM bastien dine <ba...@gmail.com> wrote:
> Oh god, if we have some code with Accumulator after the env.execute(),
> this will not be executed on the JobManager too ?
> Thanks, I would be interested indeed !
>
> ------------------
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier <po...@okkam.it> a
> écrit :
>
>> The problem is that the REST API block on env.execute.
>> If you want to run your Flink job you have to submit it using the CLI
>> client.
>> As a workaround we wrote a Spring REST API that to run a job open an SSH
>> connection to the job manager and execute the bin/flink run command..
>>
>> If you're interested in I can share some code..
>>
>>
>>
>> On Fri, Nov 23, 2018 at 4:32 PM bastien dine <ba...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I need to chain processing in DataSet API, so I am launching severals
>>> jobs, with multiple env.execute() :
>>>
>>> topology1.define();
>>> env.execute;
>>>
>>> topogy2.define();
>>> env.execute;
>>>
>>> This is working fine when I am running it within IntellIiJ
>>> But when I am deploying it into my cluster, it only launch the first
>>> topology..
>>>
>>> Could you please shed some light on this issue?
>>>
>>> Regards,
>>> Bastien
>>>
>>
>>
>>
--
Flavio Pompermaier
Development Department
OKKAM S.r.l.
Tel. +(39) 0461 041809
Re: Multiple env.execute() into one Flink batch job
Posted by bastien dine <ba...@gmail.com>.
Oh god, if we have some code with Accumulator after the env.execute(), this
will not be executed on the JobManager too ?
Thanks, I would be interested indeed !
------------------
Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io
Le ven. 23 nov. 2018 à 16:37, Flavio Pompermaier <po...@okkam.it> a
écrit :
> The problem is that the REST API block on env.execute.
> If you want to run your Flink job you have to submit it using the CLI
> client.
> As a workaround we wrote a Spring REST API that to run a job open an SSH
> connection to the job manager and execute the bin/flink run command..
>
> If you're interested in I can share some code..
>
>
>
> On Fri, Nov 23, 2018 at 4:32 PM bastien dine <ba...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I need to chain processing in DataSet API, so I am launching severals
>> jobs, with multiple env.execute() :
>>
>> topology1.define();
>> env.execute;
>>
>> topogy2.define();
>> env.execute;
>>
>> This is working fine when I am running it within IntellIiJ
>> But when I am deploying it into my cluster, it only launch the first
>> topology..
>>
>> Could you please shed some light on this issue?
>>
>> Regards,
>> Bastien
>>
>
>
>
Re: Multiple env.execute() into one Flink batch job
Posted by Flavio Pompermaier <po...@okkam.it>.
The problem is that the REST API block on env.execute.
If you want to run your Flink job you have to submit it using the CLI
client.
As a workaround we wrote a Spring REST API that to run a job open an SSH
connection to the job manager and execute the bin/flink run command..
If you're interested in I can share some code..
On Fri, Nov 23, 2018 at 4:32 PM bastien dine <ba...@gmail.com> wrote:
> Hello,
>
> I need to chain processing in DataSet API, so I am launching severals
> jobs, with multiple env.execute() :
>
> topology1.define();
> env.execute;
>
> topogy2.define();
> env.execute;
>
> This is working fine when I am running it within IntellIiJ
> But when I am deploying it into my cluster, it only launch the first
> topology..
>
> Could you please shed some light on this issue?
>
> Regards,
> Bastien
>