You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Oleksandr Serdiukov <de...@serdukoff.me> on 2018/08/20 15:53:47 UTC
Re: Flink checkpointing to Google Cloud Storage
Now I am able to write checkpoints but cannot restore from it:
java.lang.NoClassDefFoundError:
com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666)
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136)
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80)
at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
My current setup:
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>hadoop2-1.9.5</version>
</dependency>
On Thu, Aug 16, 2018 at 11:55 AM, Oleksandr Serdiukov <de...@serdukoff.me>
wrote:
> Hello All!
>
> I am trying to configure checkpoints for flink jobs in GCS.
> Unfortunately, it fails after submitting a job. I run it using
> docker-compose on my local machine.
>
> Any thoughts of it?
> Thanks!
>
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Could not find a file system implementation for scheme 'gs'. The scheme is
> not directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:405)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStorage.<init>(FsCheckpointStorage.java:61)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend.
> createCheckpointStorage(FsStateBackend.java:441)
> at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.
> createCheckpointStorage(RocksDBStateBackend.java:379)
> at org.apache.flink.runtime.checkpoint.
> CheckpointCoordinator.<init>(CheckpointCoordinator.java:247)
> ... 33 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Hadoop is not in the classpath/dependencies.
> at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(
> UnsupportedSchemeFactory.java:64)
> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(
> FileSystem.java:401)
>
>
> Env configuration is like this:
>
> StreamExecutionEnvironment env = applicationContext.getBean(
> StreamExecutionEnvironment.class);
> CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> checkpointConfig.setFailOnCheckpointingErrors(false);
> checkpointConfig.setCheckpointInterval(10000);
> checkpointConfig.setMinPauseBetweenCheckpoints(5000);
> checkpointConfig.setMaxConcurrentCheckpoints(1);
> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_
> ONCE);
> RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
> String.format("gs://checkpoints/%s",
> jobClass.getSimpleName()), true);
> env.setStateBackend((StateBackend) rocksDBStateBackend);
>
>
> Here is my `core-site.xml` file:
>
> <configuration>
> <property>
> <name>google.cloud.auth.service.account.enable</name>
> <value>true</value>
> </property>
> <property>
> <name>google.cloud.auth.service.account.json.keyfile</name>
> <value>${user.dir}/key.json</value>
> </property>
> <property>
> <name>fs.gs.impl</name>
> <value>com.google.cloud.hadoop.fs.gcs.
> GoogleHadoopFileSystem</value>
> <description>The FileSystem for gs: (GCS) uris.</description>
> </property>
> <property>
> <name>fs.AbstractFileSystem.gs.impl</name>
> <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
> <description>The AbstractFileSystem for gs: (GCS)
> uris.</description>
> </property>
> <property>
> <name>fs.gs.application.name.suffix</name>
> <value>-kube-flink</value>
> <description>
> Appended to the user-agent header for API requests to GCS to
> help identify
> the traffic as coming from Dataproc.
> </description>
> </property>
> </configuration>
>
> Dependency to gcs-connector and Hadoop:
>
> <dependency>
> <groupId>com.google.cloud.bigdataoss</groupId>
> <artifactId>gcs-connector</artifactId>
> <version>1.9.4-hadoop2</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-common</artifactId>
> <version>2.9.1</version>
> </dependency>
>
>