You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Oleksandr Serdiukov <de...@serdukoff.me> on 2018/08/20 15:53:47 UTC

Re: Flink checkpointing to Google Cloud Storage

Now I am able to write checkpoints but cannot restore from it:

java.lang.NoClassDefFoundError:
com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
	at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)


My current setup:

<dependency>
            <groupId>com.google.cloud.bigdataoss</groupId>
            <artifactId>gcs-connector</artifactId>
            <version>hadoop2-1.9.5</version>
</dependency>


On Thu, Aug 16, 2018 at 11:55 AM, Oleksandr Serdiukov <de...@serdukoff.me>
wrote:

> Hello All!
>
> I am trying to configure checkpoints for flink jobs in GCS.
> Unfortunately, it fails after submitting a job. I run it using
> docker-compose on my local machine.
>
> Any thoughts of it?
> Thanks!
>
>     Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'gs'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
>         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
>         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>         at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>         at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
>         at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createCheckpointStorage(FsStateBackend.java:441)
>         at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> createCheckpointStorage(RocksDBStateBackend.java:379)
>         at org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
>         ... 33 more
>     Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
>         at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
>         at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>
>
> Env configuration is like this:
>
>     StreamExecutionEnvironment env = applicationContext.getBean(
> StreamExecutionEnvironment.class);
>         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>         checkpointConfig.setFailOnCheckpointingErrors(false);
>         checkpointConfig.setCheckpointInterval(10000);
>         checkpointConfig.setMinPauseBetweenCheckpoints(5000);
>         checkpointConfig.setMaxConcurrentCheckpoints(1);
>         checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_
> ONCE);
>         RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
>                 String.format("gs://checkpoints/%s",
> jobClass.getSimpleName()), true);
>         env.setStateBackend((StateBackend) rocksDBStateBackend);
>
>
> Here is my `core-site.xml` file:
>
>     <configuration>
>     <property>
>         <name>google.cloud.auth.service.account.enable</name>
>         <value>true</value>
>     </property>
>     <property>
>         <name>google.cloud.auth.service.account.json.keyfile</name>
>         <value>${user.dir}/key.json</value>
>     </property>
>     <property>
>         <name>fs.gs.impl</name>
>         <value>com.google.cloud.hadoop.fs.gcs.
> GoogleHadoopFileSystem</value>
>         <description>The FileSystem for gs: (GCS) uris.</description>
>     </property>
>     <property>
>         <name>fs.AbstractFileSystem.gs.impl</name>
>         <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
>         <description>The AbstractFileSystem for gs: (GCS)
> uris.</description>
>     </property>
>     <property>
>         <name>fs.gs.application.name.suffix</name>
>         <value>-kube-flink</value>
>         <description>
>             Appended to the user-agent header for API requests to GCS to
> help identify
>             the traffic as coming from Dataproc.
>         </description>
>     </property>
> </configuration>
>
> Dependency to gcs-connector and Hadoop:
>
>     <dependency>
>             <groupId>com.google.cloud.bigdataoss</groupId>
>             <artifactId>gcs-connector</artifactId>
>             <version>1.9.4-hadoop2</version>
>     </dependency>
>     <dependency>
>            <groupId>org.apache.hadoop</groupId>
>            <artifactId>hadoop-common</artifactId>
>            <version>2.9.1</version>
>     </dependency>
>
>