You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2016/08/25 08:24:07 UTC

Delaying starting the jobmanager in yarn?

Hi,

We have a situation where we need to start a flink batch job on a yarn
cluster the moment an event arrives over a queue.
These events occur at a very low rate (like once or twice a week).

The idea we have is to run an application that listens to the queue and
executes the batch when it receives a message.

We found that if we start this using 'flink run -m yarn-cluster ..." the
moment we run this the jobmanager in yarn is started and the resources for
these batches is claimed immediately.

What is the recommended way to only claim these resources when we actually
have a job to run?
Can we 'manually' start and stop the jobmanager in yarn in some way from
our java code?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Delaying starting the jobmanager in yarn?

Posted by Maximilian Michels <mx...@apache.org>.
You too!

On Fri, Aug 26, 2016 at 4:15 PM, Niels Basjes <Ni...@basjes.nl> wrote:
> Thanks!
> I'm going to work with this next week.
>
> Have a nice weekend.
>
> Niels
>
> On Fri, Aug 26, 2016 at 2:49 PM, Maximilian Michels <mx...@apache.org> wrote:
>>
>> It is a bit more involved as I thought. We could simply the API further:
>>
>> import org.apache.flink.client.program.PackagedProgram;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.configuration.GlobalConfiguration;
>> import org.apache.hadoop.fs.Path;
>>
>> import java.io.File;
>> import java.net.URL;
>> import java.util.Collections;
>>
>> public class Niels {
>>
>>    public static void main(String[] args) throws Exception {
>>
>>       final String basePath = "/Users/max/Dev/flink/build-target/";
>>       final String confDir = basePath + "/conf";
>>       final String flinkJarPath = basePath +
>> "/lib/flink-dist_2.10-1.2-SNAPSHOT.jar";
>>
>>       final PackagedProgram packagedProgram =
>>          new PackagedProgram(
>>             new File(basePath + "/examples/streaming/WordCount.jar"),
>>             // We need the Flink jar here because we want to lookup the
>> main method
>>             // which might contains dependencies to Flink which are not in
>> the user jar
>>             Collections.singletonList(new URL("file://" + flinkJarPath)));
>>
>>       final YarnClusterDescriptor descriptor = new
>> YarnClusterDescriptor();
>>
>>       Configuration configuration =
>> GlobalConfiguration.loadConfiguration(confDir);
>>       descriptor.setFlinkConfiguration(configuration);
>>
>>       descriptor.setConfigurationDirectory(confDir);
>>       descriptor.setConfigurationFilePath(new Path(confDir +
>> "/flink-conf.yaml"));
>>
>>       descriptor.setLocalJarPath(new Path(flinkJarPath));
>>       descriptor.setTaskManagerCount(2);
>>       descriptor.setName("Testing the YarnClusterClient");
>>
>>       final YarnClusterClient client = descriptor.deploy();
>>       client.run(packagedProgram, 2);
>>       client.shutdown();
>>    }
>> }
>>
>>
>> On Thu, Aug 25, 2016 at 5:06 PM, Niels Basjes <Ni...@basjes.nl> wrote:
>>>
>>> Sounds good.
>>> Is there a basic example somewhere I can have a look at?
>>>
>>> Niels
>>>
>>> On Thu, Aug 25, 2016 at 2:55 PM, Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>>
>>>> Hi Niels,
>>>>
>>>> If you're using 1.1.1, then you can instantiate the
>>>> YarnClusterDescriptor and supply it with the Flink jar and
>>>> configuration and subsequently call `deploy()` on it to receive a
>>>> ClusterClient for Yarn which you can submit programs using the
>>>> `run(PackagedProgram program, String args)` method. You can also
>>>> cancel jobs or shutdown the cluster from the ClusterClient.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On Thu, Aug 25, 2016 at 10:24 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>>> > Hi,
>>>> >
>>>> > We have a situation where we need to start a flink batch job on a yarn
>>>> > cluster the moment an event arrives over a queue.
>>>> > These events occur at a very low rate (like once or twice a week).
>>>> >
>>>> > The idea we have is to run an application that listens to the queue
>>>> > and
>>>> > executes the batch when it receives a message.
>>>> >
>>>> > We found that if we start this using 'flink run -m yarn-cluster ..."
>>>> > the
>>>> > moment we run this the jobmanager in yarn is started and the resources
>>>> > for
>>>> > these batches is claimed immediately.
>>>> >
>>>> > What is the recommended way to only claim these resources when we
>>>> > actually
>>>> > have a job to run?
>>>> > Can we 'manually' start and stop the jobmanager in yarn in some way
>>>> > from our
>>>> > java code?
>>>> >
>>>> > --
>>>> > Best regards / Met vriendelijke groeten,
>>>> >
>>>> > Niels Basjes
>>>
>>>
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>
>>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes

Re: Delaying starting the jobmanager in yarn?

Posted by Niels Basjes <Ni...@basjes.nl>.
Thanks!
I'm going to work with this next week.

Have a nice weekend.

Niels

On Fri, Aug 26, 2016 at 2:49 PM, Maximilian Michels <mx...@apache.org> wrote:

> It is a bit more involved as I thought. We could simply the API further:
>
> import org.apache.flink.client.program.PackagedProgram;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.configuration.GlobalConfiguration;
> import org.apache.hadoop.fs.Path;
>
> import java.io.File;
> import java.net.URL;
> import java.util.Collections;
>
> public class Niels {
>
>    public static void main(String[] args) throws Exception {
>
>       final String basePath = "/Users/max/Dev/flink/build-target/";
>       final String confDir = basePath + "/conf";
>       final String flinkJarPath = basePath + "/lib/flink-dist_2.10-1.2-SNAPSHOT.jar";
>
>       final PackagedProgram packagedProgram =
>          new PackagedProgram(
>             new File(basePath + "/examples/streaming/WordCount.jar"),
>             // We need the Flink jar here because we want to lookup the main method
>             // which might contains dependencies to Flink which are not in the user jar
>             Collections.singletonList(new URL("file://" + flinkJarPath)));
>
>       final YarnClusterDescriptor descriptor = new YarnClusterDescriptor();
>
>       Configuration configuration = GlobalConfiguration.loadConfiguration(confDir);
>       descriptor.setFlinkConfiguration(configuration);
>
>       descriptor.setConfigurationDirectory(confDir);
>       descriptor.setConfigurationFilePath(new Path(confDir + "/flink-conf.yaml"));
>
>       descriptor.setLocalJarPath(new Path(flinkJarPath));
>       descriptor.setTaskManagerCount(2);
>       descriptor.setName("Testing the YarnClusterClient");
>
>       final YarnClusterClient client = descriptor.deploy();
>       client.run(packagedProgram, 2);
>       client.shutdown();
>    }
> }
>
>
> On Thu, Aug 25, 2016 at 5:06 PM, Niels Basjes <Ni...@basjes.nl> wrote:
>
>> Sounds good.
>> Is there a basic example somewhere I can have a look at?
>>
>> Niels
>>
>> On Thu, Aug 25, 2016 at 2:55 PM, Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> Hi Niels,
>>>
>>> If you're using 1.1.1, then you can instantiate the
>>> YarnClusterDescriptor and supply it with the Flink jar and
>>> configuration and subsequently call `deploy()` on it to receive a
>>> ClusterClient for Yarn which you can submit programs using the
>>> `run(PackagedProgram program, String args)` method. You can also
>>> cancel jobs or shutdown the cluster from the ClusterClient.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Thu, Aug 25, 2016 at 10:24 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>>> > Hi,
>>> >
>>> > We have a situation where we need to start a flink batch job on a yarn
>>> > cluster the moment an event arrives over a queue.
>>> > These events occur at a very low rate (like once or twice a week).
>>> >
>>> > The idea we have is to run an application that listens to the queue and
>>> > executes the batch when it receives a message.
>>> >
>>> > We found that if we start this using 'flink run -m yarn-cluster ..."
>>> the
>>> > moment we run this the jobmanager in yarn is started and the resources
>>> for
>>> > these batches is claimed immediately.
>>> >
>>> > What is the recommended way to only claim these resources when we
>>> actually
>>> > have a job to run?
>>> > Can we 'manually' start and stop the jobmanager in yarn in some way
>>> from our
>>> > java code?
>>> >
>>> > --
>>> > Best regards / Met vriendelijke groeten,
>>> >
>>> > Niels Basjes
>>>
>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Delaying starting the jobmanager in yarn?

Posted by Maximilian Michels <mx...@apache.org>.
It is a bit more involved as I thought. We could simply the API further:

import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.net.URL;
import java.util.Collections;

public class Niels {

   public static void main(String[] args) throws Exception {

      final String basePath = "/Users/max/Dev/flink/build-target/";
      final String confDir = basePath + "/conf";
      final String flinkJarPath = basePath +
"/lib/flink-dist_2.10-1.2-SNAPSHOT.jar";

      final PackagedProgram packagedProgram =
         new PackagedProgram(
            new File(basePath + "/examples/streaming/WordCount.jar"),
            // We need the Flink jar here because we want to lookup
the main method
            // which might contains dependencies to Flink which are
not in the user jar
            Collections.singletonList(new URL("file://" + flinkJarPath)));

      final YarnClusterDescriptor descriptor = new YarnClusterDescriptor();

      Configuration configuration =
GlobalConfiguration.loadConfiguration(confDir);
      descriptor.setFlinkConfiguration(configuration);

      descriptor.setConfigurationDirectory(confDir);
      descriptor.setConfigurationFilePath(new Path(confDir +
"/flink-conf.yaml"));

      descriptor.setLocalJarPath(new Path(flinkJarPath));
      descriptor.setTaskManagerCount(2);
      descriptor.setName("Testing the YarnClusterClient");

      final YarnClusterClient client = descriptor.deploy();
      client.run(packagedProgram, 2);
      client.shutdown();
   }
}


On Thu, Aug 25, 2016 at 5:06 PM, Niels Basjes <Ni...@basjes.nl> wrote:

> Sounds good.
> Is there a basic example somewhere I can have a look at?
>
> Niels
>
> On Thu, Aug 25, 2016 at 2:55 PM, Maximilian Michels <mx...@apache.org>
> wrote:
>
>> Hi Niels,
>>
>> If you're using 1.1.1, then you can instantiate the
>> YarnClusterDescriptor and supply it with the Flink jar and
>> configuration and subsequently call `deploy()` on it to receive a
>> ClusterClient for Yarn which you can submit programs using the
>> `run(PackagedProgram program, String args)` method. You can also
>> cancel jobs or shutdown the cluster from the ClusterClient.
>>
>> Cheers,
>> Max
>>
>> On Thu, Aug 25, 2016 at 10:24 AM, Niels Basjes <Ni...@basjes.nl> wrote:
>> > Hi,
>> >
>> > We have a situation where we need to start a flink batch job on a yarn
>> > cluster the moment an event arrives over a queue.
>> > These events occur at a very low rate (like once or twice a week).
>> >
>> > The idea we have is to run an application that listens to the queue and
>> > executes the batch when it receives a message.
>> >
>> > We found that if we start this using 'flink run -m yarn-cluster ..." the
>> > moment we run this the jobmanager in yarn is started and the resources
>> for
>> > these batches is claimed immediately.
>> >
>> > What is the recommended way to only claim these resources when we
>> actually
>> > have a job to run?
>> > Can we 'manually' start and stop the jobmanager in yarn in some way
>> from our
>> > java code?
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>

Re: Delaying starting the jobmanager in yarn?

Posted by Niels Basjes <Ni...@basjes.nl>.
Sounds good.
Is there a basic example somewhere I can have a look at?

Niels

On Thu, Aug 25, 2016 at 2:55 PM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Niels,
>
> If you're using 1.1.1, then you can instantiate the
> YarnClusterDescriptor and supply it with the Flink jar and
> configuration and subsequently call `deploy()` on it to receive a
> ClusterClient for Yarn which you can submit programs using the
> `run(PackagedProgram program, String args)` method. You can also
> cancel jobs or shutdown the cluster from the ClusterClient.
>
> Cheers,
> Max
>
> On Thu, Aug 25, 2016 at 10:24 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> > Hi,
> >
> > We have a situation where we need to start a flink batch job on a yarn
> > cluster the moment an event arrives over a queue.
> > These events occur at a very low rate (like once or twice a week).
> >
> > The idea we have is to run an application that listens to the queue and
> > executes the batch when it receives a message.
> >
> > We found that if we start this using 'flink run -m yarn-cluster ..." the
> > moment we run this the jobmanager in yarn is started and the resources
> for
> > these batches is claimed immediately.
> >
> > What is the recommended way to only claim these resources when we
> actually
> > have a job to run?
> > Can we 'manually' start and stop the jobmanager in yarn in some way from
> our
> > java code?
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>



-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Delaying starting the jobmanager in yarn?

Posted by Maximilian Michels <mx...@apache.org>.
Hi Niels,

If you're using 1.1.1, then you can instantiate the
YarnClusterDescriptor and supply it with the Flink jar and
configuration and subsequently call `deploy()` on it to receive a
ClusterClient for Yarn which you can submit programs using the
`run(PackagedProgram program, String args)` method. You can also
cancel jobs or shutdown the cluster from the ClusterClient.

Cheers,
Max

On Thu, Aug 25, 2016 at 10:24 AM, Niels Basjes <Ni...@basjes.nl> wrote:
> Hi,
>
> We have a situation where we need to start a flink batch job on a yarn
> cluster the moment an event arrives over a queue.
> These events occur at a very low rate (like once or twice a week).
>
> The idea we have is to run an application that listens to the queue and
> executes the batch when it receives a message.
>
> We found that if we start this using 'flink run -m yarn-cluster ..." the
> moment we run this the jobmanager in yarn is started and the resources for
> these batches is claimed immediately.
>
> What is the recommended way to only claim these resources when we actually
> have a job to run?
> Can we 'manually' start and stop the jobmanager in yarn in some way from our
> java code?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes