You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Victor Godoy Poluceno <vi...@gmail.com> on 2017/07/25 12:45:49 UTC

Unable to make mapWithState work correctly

Hi,

I am trying to write a simple streaming program to count values from a
Kafka topic in a fault tolerant manner, like this
<https://gist.github.com/victorpoluceno/8690df8459bf3afd60477f83ec78f7a8>:

<code>
val config: Configuration = new Configuration()
config.setString(ConfigConstants.STATE_BACKEND, "filesystem")
config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink")

val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(10)

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

val stream = env
    .addSource(new FlinkKafkaConsumer010[String]("test", new
SimpleStringSchema(), properties))
    .map((_, 1))
    .keyBy(_._1)
    .mapWithState((in: (String, Int), count: Option[Int]) => {
      val newCount = in._2 + count.getOrElse(0)
      ((in._1, newCount), Some(newCount))
    }).print

env.execute("Job")
</code>

The idea is to use the filesystem state backend to persist the computation
state (count) and to restore the computation state in case of failure or
restart. I have a program that inject the same key on Kafka. But I am
unable to make Flink work correctly, every time the Flink restarts the
value from state is empty, so the count starts from zero. What am I missing
here?

I am running this on a local environment (sbt run) with Flink 1.3.1, Java
1.8.0_131, and Ubuntu 16.04.

-- 
hooray!

--
Victor Godoy Poluceno

Re: Unable to make mapWithState work correctly

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Victor,
from a quick look at your code, I think, you set up everything just fine (I'm 
not too familiar with Scala though) but the problem is probably somewhere 
else:
As [1] states (a bit hidden maybe), checkpoints are only used to recover from 
failures, e.g. if you run your job on 2 task managers and one of them dies. In 
that case, flink's job manager will try to re-schedule the job and restart it 
from the latest checkpoint.

I guess, what you want is a savepoint [2] (or an externalized checkpoint 
described in [1]) to be able to restore your program manually during start. If 
you run your program in a "real" flink environment as started from one of our 
startup scripts, you can go straight ahead to https://ci.apache.org/projects/
flink/flink-docs-release-1.3/setup/savepoints.html#operations to see how to 
create savepoints and restore from them.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
savepoints.html

On Tuesday, 25 July 2017 14:45:49 CEST Victor Godoy Poluceno wrote:
> Hi,
> 
> I am trying to write a simple streaming program to count values from a
> Kafka topic in a fault tolerant manner, like this
> <https://gist.github.com/victorpoluceno/8690df8459bf3afd60477f83ec78f7a8>:
> 
> <code>
> val config: Configuration = new Configuration()
> config.setString(ConfigConstants.STATE_BACKEND, "filesystem")
> config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink")
> 
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE
> ); env.enableCheckpointing(10)
> 
> val properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> 
> val stream = env
>     .addSource(new FlinkKafkaConsumer010[String]("test", new
> SimpleStringSchema(), properties))
>     .map((_, 1))
>     .keyBy(_._1)
>     .mapWithState((in: (String, Int), count: Option[Int]) => {
>       val newCount = in._2 + count.getOrElse(0)
>       ((in._1, newCount), Some(newCount))
>     }).print
> 
> env.execute("Job")
> </code>
> 
> The idea is to use the filesystem state backend to persist the computation
> state (count) and to restore the computation state in case of failure or
> restart. I have a program that inject the same key on Kafka. But I am
> unable to make Flink work correctly, every time the Flink restarts the
> value from state is empty, so the count starts from zero. What am I missing
> here?
> 
> I am running this on a local environment (sbt run) with Flink 1.3.1, Java
> 1.8.0_131, and Ubuntu 16.04.