You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Boris Lublinsky <bo...@lightbend.com> on 2020/08/20 22:14:02 UTC

Flink checkpointing with Azure block storage

Is there somewhere a complete configuration example for such option?

Re: Flink checkpointing with Azure block storage

Posted by Boris Lublinsky <bo...@lightbend.com>.
Thanks Plyush,
The thing that I was missing is this.
Now it all works


> On Aug 24, 2020, at 2:44 PM, Piyush Narang <p....@criteo.com> wrote:
> 
> We had something like this when we were setting it in our code (now we’re passing it via config). There’s likely a better /cleaner way:
> private def configureCheckpoints(env: StreamExecutionEnvironment,
>                                  checkpointPath: String): Unit = {
>   if (checkpointPath.startsWith("wasb")) {
>     import org.apache.hadoop.fs.{Path => HPath}
>     import org.apache.flink.configuration.Configuration
>     import org.apache.flink.core.fs.FileSystem
> 
>     val jobCheckpointsPath = new HPath(checkpointPath)
>     val conf = new Configuration()
>     conf.setString(
>       "fs.azure.account.key.storage-account.blob.core.windows.net",
>       "access-key"
>     )
>     FileSystem.initialize(conf) // this ensures the AzureFS is initialized and with correct creds
>   }
>   // other checkpoint config stuff
> }
>  
> -- Piyush
>  
>  
> From: Boris Lublinsky <bo...@lightbend.com>
> Date: Saturday, August 22, 2020 at 10:08 PM
> To: Yun Tang <my...@live.com>
> Cc: user <us...@flink.apache.org>
> Subject: Re: Flink checkpointing with Azure block storage
>  
> Thanks Yun,
> I make it work, but now I want to set appropriate config programmatically.
> I can set state.checkpointing.dir by:
>  
> val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net <http://blob.core.windows.net/>/<object-path>"))
> env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])
>  
> But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net <http://blob.core.windows.net/>: <azure_storage_key>
> Because getConfiguration is a private method. Any suggestions?
> 
> 
> 
> 
>  
> 
> 
> On Aug 20, 2020, at 9:29 PM, Yun Tang <myasuka@live.com <ma...@live.com>> wrote:
>  
> Hi Boris
>  
> I think the official guide [1] should be enough to tell you how to configure.
> However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".
>  
> You can view the log to see whether your changes printed to search for "Loading configuration property".
>  
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration>
>  
> Best
> Yun Tang
>  
> From: Boris Lublinsky <boris.lublinsky@lightbend.com <ma...@lightbend.com>>
> Sent: Friday, August 21, 2020 7:18
> To: user <user@flink.apache.org <ma...@flink.apache.org>>
> Subject: Re: Flink checkpointing with Azure block storage
>  
> To test it, I created flink-conf.yaml file and put it in resource directory of my project
> The file contains the following:
>  
> #==============================================================================
> # Fault tolerance and checkpointing
> #==============================================================================
> 
> # The backend that will be used to store operator state checkpoints if
> # checkpointing is enabled.
> #
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # <class-name-of-factory>.
> #
> state.backend: filesystem
> 
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends.
> #
> state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net <http://blob.core.windows.net/>/<object-path>
> 
> fs.azure.account.key.<account_name>.blob.core.windows.net <http://blob.core.windows.net/>: <azure_storage_key>
> 
> # Default target directory for savepoints, optional.
> #
> # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
> 
> # Flag to enable/disable incremental checkpoints for backends that
>  
> Which should of produce error,
>  
> But what I see is that it does not seen to take effect:
>  
>  
> 313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
> 3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3
>  
> 
> 
> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <boris.lublinsky@lightbend.com <ma...@lightbend.com>> wrote:
>  
> Is there somewhere a complete configuration example for such option?
>  


Re: Flink checkpointing with Azure block storage

Posted by Piyush Narang <p....@criteo.com>.
We had something like this when we were setting it in our code (now we’re passing it via config). There’s likely a better /cleaner way:
private def configureCheckpoints(env: StreamExecutionEnvironment,
                                 checkpointPath: String): Unit = {
  if (checkpointPath.startsWith("wasb")) {
    import org.apache.hadoop.fs.{Path => HPath}
    import org.apache.flink.configuration.Configuration

    import org.apache.flink.core.fs.FileSystem

    val jobCheckpointsPath = new HPath(checkpointPath)
    val conf = new Configuration()
    conf.setString(
      "fs.azure.account.key.storage-account.blob.core.windows.net",
      "access-key"
    )
    FileSystem.initialize(conf) // this ensures the AzureFS is initialized and with correct creds
  }
  // other checkpoint config stuff
}

-- Piyush


From: Boris Lublinsky <bo...@lightbend.com>
Date: Saturday, August 22, 2020 at 10:08 PM
To: Yun Tang <my...@live.com>
Cc: user <us...@flink.apache.org>
Subject: Re: Flink checkpointing with Azure block storage

Thanks Yun,
I make it work, but now I want to set appropriate config programmatically.
I can set state.checkpointing.dir by:


val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net<http://blob.core.windows.net>/<object-path>"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net<http://blob.core.windows.net>: <azure_storage_key>
Because getConfiguration is a private method. Any suggestions?







On Aug 20, 2020, at 9:29 PM, Yun Tang <my...@live.com>> wrote:

Hi Boris

I think the official guide [1] should be enough to tell you how to configure.
However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".

You can view the log to see whether your changes printed to search for "Loading configuration property".

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration

Best
Yun Tang

________________________________
From: Boris Lublinsky <bo...@lightbend.com>>
Sent: Friday, August 21, 2020 7:18
To: user <us...@flink.apache.org>>
Subject: Re: Flink checkpointing with Azure block storage

To test it, I created flink-conf.yaml file and put it in resource directory of my project
The file contains the following:


#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net<http://blob.core.windows.net/>/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net<http://blob.core.windows.net/>: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3



On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <bo...@lightbend.com>> wrote:

Is there somewhere a complete configuration example for such option?


Re: Flink checkpointing with Azure block storage

Posted by Boris Lublinsky <bo...@lightbend.com>.
Thanks Yun,
I make it work, but now I want to set appropriate config programmatically.
I can set state.checkpointing.dir by:

val fsStateBackend = new FsStateBackend(new URI("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"))
env.setStateBackend(fsStateBackend.asInstanceOf[StateBackend])

But, I can’t update configuration to add credentials fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>
Because getConfiguration is a private method. Any suggestions?




> On Aug 20, 2020, at 9:29 PM, Yun Tang <my...@live.com> wrote:
> 
> Hi Boris
> 
> I think the official guide [1] should be enough to tell you how to configure.
> However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".
> 
> You can view the log to see whether your changes printed to search for "Loading configuration property".
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration>
> 
> Best
> Yun Tang
> 
> From: Boris Lublinsky <bo...@lightbend.com>
> Sent: Friday, August 21, 2020 7:18
> To: user <us...@flink.apache.org>
> Subject: Re: Flink checkpointing with Azure block storage
>  
> To test it, I created flink-conf.yaml file and put it in resource directory of my project
> The file contains the following:
> 
> #==============================================================================
> # Fault tolerance and checkpointing
> #==============================================================================
> 
> # The backend that will be used to store operator state checkpoints if
> # checkpointing is enabled.
> #
> # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
> # <class-name-of-factory>.
> #
> state.backend: filesystem
> 
> # Directory for checkpoints filesystem, when using any of the default bundled
> # state backends.
> #
> state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net <http://blob.core.windows.net/>/<object-path>
> 
> fs.azure.account.key.<account_name>.blob.core.windows.net <http://blob.core.windows.net/>: <azure_storage_key>
> 
> # Default target directory for savepoints, optional.
> #
> # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints <applewebdata://8BADD1A2-B986-4F22-A39B-2CF3F1E02EBB>
> 
> # Flag to enable/disable incremental checkpoints for backends that
> 
> Which should of produce error,
> 
> But what I see is that it does not seen to take effect:
> 
> 
> 313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
> 3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
> 3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3 <applewebdata://8BADD1A2-B986-4F22-A39B-2CF3F1E02EBB>
> 
> 
>> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <boris.lublinsky@lightbend.com <ma...@lightbend.com>> wrote:
>> 
>> Is there somewhere a complete configuration example for such option?


Re: Flink checkpointing with Azure block storage

Posted by Yun Tang <my...@live.com>.
Hi Boris

I think the official guide [1] should be enough to tell you how to configure.
However, I think your changes to flink-conf.ymal might not take effect as you have configured the state backend as 'filesystem' while logs still tell us that "No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend".

You can view the log to see whether your changes printed to search for "Loading configuration property".

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/azure.html#credentials-configuration

Best
Yun Tang

________________________________
From: Boris Lublinsky <bo...@lightbend.com>
Sent: Friday, August 21, 2020 7:18
To: user <us...@flink.apache.org>
Subject: Re: Flink checkpointing with Azure block storage

To test it, I created flink-conf.yaml file and put it in resource directory of my project
The file contains the following:


#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net<http://blob.core.windows.net>/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net<http://blob.core.windows.net>: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3


On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <bo...@lightbend.com>> wrote:

Is there somewhere a complete configuration example for such option?


Re: Flink checkpointing with Azure block storage

Posted by Boris Lublinsky <bo...@lightbend.com>.
To test it, I created flink-conf.yaml file and put it in resource directory of my project
The file contains the following:

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
state.checkpoints.dir: wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that

Which should of produce error,

But what I see is that it does not seen to take effect:


313 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
3327 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@5ef4eacb for Flink Streaming Job (427dae12e8f7243742ae8bd152467edc).
3329 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender akka://flink/user/rpc/jobmanager_3


> On Aug 20, 2020, at 5:14 PM, Boris Lublinsky <bo...@lightbend.com> wrote:
> 
> Is there somewhere a complete configuration example for such option?