You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by chenxyz <ch...@163.com> on 2020/04/01 07:02:25 UTC

rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for KeyedProcessOperator。这个问题怎么解决呢?

版本:1.10 standalone

配置信息:

state.backend: rocksdb

state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint

state.backend.incremental: true

jobmanager.execution.failover-strategy: region

io.tmp.dirs: /data/flink1_10/tmp




任务的checkpoint配置:

env.enableCheckpointing(2 * 60 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);

env.getCheckpointConfig().setCheckpointTimeout(60000);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




日志信息:




2020-04-01 11:13:03

java.lang.Exception: Exception while creating StreamOperatorStateContext.

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the 1 provided restore options.

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more

Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst -> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)

at java.nio.file.Files.createLink(Files.java:1086)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

... 15 more







TaskManager的报错信息:




2020-04-01 14:48:10,726 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  - Caught unexpected exception.

java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline

at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)

at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)

at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)

at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)

at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)

at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)

at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)

at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)

at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)

at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)

at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

2020-04-01 14:48:10,726 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception while restoring keyed state backend for KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from alternative (1/1), will retry while more alternatives are available.

org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.InterruptedIOException: Interrupted while waiting for data to be acknowledged by pipeline

at org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)

at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)

at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)

at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)

at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)

at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)

at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)

at org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)

at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)

at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)

at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)

at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)

at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

... 15 more

Re:Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

Posted by chenxyz <ch...@163.com>.
Hi,Congxian:
不好意思,邮件消失在了邮件海中...
我是这么复现的,直接重启运行这个任务的TM。然后就会出现KeyedProcessFunction恢复失败。只有RocksDB StateBackend会出现这种错误,使用HDFS作为FsBackend可以正常恢复任务。一开始我以为是KeyedProcessFunction里面的自定义State恢复失败,最后写了一个空的KeyedProcessFunction也不能成功恢复任务。下面附上一个简单的Demo。
public class App {

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(2 * 60 * 1000);

DataStreamSource<Student> source = env.addSource(new SourceFunction<Student>() {

private volatile boolean running = true;

@Override
public void run(SourceContext<Student> ctx) throws Exception {
                Random rand = new Random();
                for (int i = 0; i < 100; i++) {
int id = rand.nextInt();
ctx.collect(new Student(id, "Tom" + id));
}
synchronized (this) {
while (running) {
this.wait();
}
                }
            }

@Override
public void cancel() {
synchronized (this) {
running = false;
                    this.notifyAll();
}
            }
        });

source.keyBy("id").process(new KeyedProcessFunction<Tuple, Student, Student>() {
@Override
public void processElement(Student value, Context ctx, Collector<Student> out) throws Exception {
                out.collect(value);
}
        }).addSink(new SinkFunction<Student>() {
@Override
public void invoke(Student value, Context context) throws Exception {
                System.out.println(value);
}
        });


env.execute("test keyed process operator state restore....");
}

@Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
public static class Student implements Serializable {
private static final long serialVersionUID = 3909702675393996601L;
        private Integer id;
        private String name;
}
}

下面附上开启了DEBUG的log:

2020-04-14 11:42:44,679 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Establish JobManager connection for job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,684 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Offer reserved slots to the leader of job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,727 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Registered new allocation id ed04b5323aa885406201e85c9f8b7c78 for local state stores for job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,729 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Registered new local state store with configuration LocalRecoveryConfig{localRecoveryMode=false, localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78], jobID=6fd13de6e9c84a51425f7cc34ce94940, jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 6fd13de6e9c84a51425f7cc34ce94940 - bc764cd8ddf7a0cff126f51c16239658 - 0 under allocation id ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,742 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionFactory  - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Initialized org.apache.flink.runtime.io.network.partition.ResultPartitionFactory@41801faf

2020-04-14 11:42:44,747 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task Source: Custom Source (1/1).

2020-04-14 11:42:44,748 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) switched from CREATED to DEPLOYING.

2020-04-14 11:42:44,748 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING]

2020-04-14 11:42:44,751 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].

2020-04-14 11:42:44,752 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Activate slot ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,772 DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - Registered new local state store with configuration LocalRecoveryConfig{localRecoveryMode=false, localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78], jobID=6fd13de6e9c84a51425f7cc34ce94940, jobVertexID=20ba6b65f97481d5570070de90e4e791, subtaskIndex=0}} for 6fd13de6e9c84a51425f7cc34ce94940 - 20ba6b65f97481d5570070de90e4e791 - 0 under allocation id ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,786 DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory  - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): Created 1 input channels (local: 1, remote: 0, unknown: 0).

2020-04-14 11:42:44,788 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task KeyedProcess -> Sink: Unnamed (1/1).

2020-04-14 11:42:44,795 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) switched from CREATED to DEPLOYING.

2020-04-14 11:42:44,805 INFO  org.apache.flink.runtime.taskmanager.Task                     - Creating FileSystem stream leak safety net for task KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING]

2020-04-14 11:42:44,812 INFO  org.apache.flink.runtime.taskmanager.Task                     - Loading JAR files for task KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING].

2020-04-14 11:42:44,817 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying from hdfs://nameservice1/data/flink1_10/ha/flink1_10_0/blob/job_6fd13de6e9c84a51425f7cc34ce94940/blob_p-6581a081d862993cf5a06573dbb6621fef1e46b2-f795a9ecd636e88bdf7ddd7746b9ca06 to /data/flink1_10/tmp/blobStore-e924cf2e-5e6c-48c2-893e-c2e9c0a809b6/incoming/temp-00000000.

2020-04-14 11:42:45,060 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received heartbeat request from 4ff74d2e5ff4f66a88688fdeafd2d3ec.

2020-04-14 11:42:45,919 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Getting user code class loader for task ee17273414060c57d2d331a83d1a84fc at library cache manager took 1167 milliseconds

2020-04-14 11:42:45,920 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Getting user code class loader for task 406f2d0b26fb4b1040ae5ac00028202d at library cache manager took 1108 milliseconds

2020-04-14 11:42:45,931 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].

2020-04-14 11:42:45,931 INFO  org.apache.flink.runtime.taskmanager.Task                     - Registering task at network: KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING].

2020-04-14 11:42:45,934 DEBUG org.apache.flink.runtime.io.network.buffer.LocalBufferPool    - Using a local buffer pool with 2-10 buffers

2020-04-14 11:42:45,934 DEBUG org.apache.flink.runtime.io.network.buffer.LocalBufferPool    - Using a local buffer pool with 0-8 buffers

2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - Registered ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions].

2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel  - LocalInputChannel [0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc]: Requesting LOCAL subpartition 0 of partition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc.

2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.TaskEventDispatcher       - registering 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc

2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - Requesting subpartition 0 of ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions].

2020-04-14 11:42:45,935 DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition  - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Creating read view for subpartition 0 of partition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc.

2020-04-14 11:42:45,937 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition  - Created PipelinedSubpartitionView(index: 0) of ResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc

2020-04-14 11:42:45,961 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) switched from DEPLOYING to RUNNING.

2020-04-14 11:42:45,963 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Initializing KeyedProcess -> Sink: Unnamed (1/1).

2020-04-14 11:42:45,967 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

2020-04-14 11:42:45,984 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Using partitioner HASH for output 0 of task Source: Custom Source

2020-04-14 11:42:45,992 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using predefined options: DEFAULT.

2020-04-14 11:42:45,992 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-04-14 11:42:45,994 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) switched from DEPLOYING to RUNNING.

2020-04-14 11:42:45,994 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Initializing Source: Custom Source (1/1).

2020-04-14 11:42:45,994 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

2020-04-14 11:42:45,995 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using predefined options: DEFAULT.

2020-04-14 11:42:45,995 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using default options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-04-14 11:42:46,033 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Invoking Source: Custom Source (1/1)

2020-04-14 11:42:46,042 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) with empty state.

2020-04-14 11:42:46,057 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask           - Invoking KeyedProcess -> Sink: Unnamed (1/1)

2020-04-14 11:42:46,060 DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl           - Operator c09dc291fad93d575e015871097bfc60 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=StateObjectCollection{[]}, stateSize=0} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,060 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/1) with empty state.

2020-04-14 11:42:46,069 DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl           - Operator 20ba6b65f97481d5570070de90e4e791 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[IncrementalRemoteKeyedStateHandle{backendIdentifier=04ac09d6-1f1f-4a6c-a78d-74090c83b3c7, keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1, sharedState={}, privateState={MANIFEST-000006=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/2ff261b8-f51c-42bf-9fab-93c6b119dcff', dataBytes=206}, OPTIONS-000010=File State: hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/426c66a6-d32e-43c8-9873-550237ee0963 [10379 bytes], CURRENT=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/bbbce7c9-ea02-4590-9b18-d7a322deb2f4', dataBytes=16}}, metaStateHandle=File State: hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/chk-1/9215630d-632e-48f6-b668-7dc235a8ff7a [1163 bytes], registered=false}]}, keyedStateFromStream=StateObjectCollection{[]}, stateSize=11764} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,070 DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with state from alternative (1/1).

2020-04-14 11:42:46,071 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to load RocksDB native library and store it under '/data/flink1_10/tmp'

2020-04-14 11:42:46,071 DEBUG org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to create RocksDB native library folder /data/flink1_10/tmp/rocksdb-lib-a5f35d4dd06539876a20dbabc82a7f33

2020-04-14 11:42:46,078 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true

2020-04-14 11:42:46,079 DEBUG org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true

2020-04-14 11:42:46,080 DEBUG org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory  - Loaded default ResourceLeakDetector: org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@28a9bbee

2020-04-14 11:42:46,150 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Successfully loaded RocksDB native library

2020-04-14 11:42:46,154 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Getting managed memory shared cache for RocksDB.

2020-04-14 11:42:46,161 INFO  org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Obtained shared RocksDB cache of size 53687092 bytes

2020-04-14 11:42:46,495 DEBUG org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation  - Restoring keyed backend uid in operator KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from incremental snapshot to 04ac09d6-1f1f-4a6c-a78d-74090c83b3c7.

2020-04-14 11:42:46,571 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  - Caught unexpected exception.

java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT

        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)

        at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)

        at java.nio.file.Files.copy(Files.java:1274)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

        at java.lang.Thread.run(Thread.java:748)

2020-04-14 11:42:46,576 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception while restoring keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from alternative (1/1), will retry while more alternatives are available.

org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT

        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)

        at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)

        at java.nio.file.Files.copy(Files.java:1274)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

        ... 15 more

2020-04-14 11:42:46,579 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) switched from RUNNING to FAILED.

java.lang.Exception: Exception while creating StreamOperatorStateContext.

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from any of the 1 provided restore options.

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

        ... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.

        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

        at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

        at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

        at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

        ... 11 more

Caused by: java.nio.file.NoSuchFileException: /data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT

        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)

        at sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)

        at java.nio.file.Files.copy(Files.java:1274)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

        at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

        at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

        ... 15 more

2020-04-14 11:42:46,588 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d).

2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.taskmanager.Task                     - Release task KeyedProcess -> Sink: Unnamed (1/1) network resources (state: FAILED).

2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate  - KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): Releasing org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@23ae29cd.

2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition  - ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions]: Received consumed notification for subpartition 0.

2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - Received consume notification from ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions].

2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition  - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Releasing ReleaseOnConsumptionResultPartition 0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc [PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions].

2020-04-14 11:42:46,590 DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition  - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Released PipelinedSubpartition#0 [number of buffers: 1 (0 bytes), number of buffers in backlog: 1, finished? false, read view? false].

2020-04-14 11:42:46,590 DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - Released partition 0d224b8294583b8fcdf469150870d2a4 produced by ee17273414060c57d2d331a83d1a84fc.

2020-04-14 11:42:46,590 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [FAILED]

2020-04-14 11:42:46,603 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state FAILED to JobManager for task KeyedProcess -> Sink: Unnamed (1/1) 406f2d0b26fb4b1040ae5ac00028202d.











在 2020-04-03 18:09:19,"Congxian Qiu" <qc...@gmail.com> 写道:
>HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
>CheckpointMetadataLoadingTest 的相关测试。
>我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
>outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
>debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好
>
>Best,
>Congxian
>
>
>chenxyz <ch...@163.com> 于2020年4月1日周三 下午5:18写道:
>
>> Hi, 从贤,
>> 我查看了下HDFS,
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-01 16:50:13,"Congxian Qiu" <qc...@gmail.com> 写道:
>> >Hi
>> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
>> >从 TM 日志看像下载出错了,你可以看下
>>
>> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
>> >
>> >Best,
>> >Congxian
>> >
>> >
>> >chenxyz <ch...@163.com> 于2020年4月1日周三 下午3:02写道:
>> >
>> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
>> >> KeyedProcessOperator。这个问题怎么解决呢?
>> >>
>> >> 版本:1.10 standalone
>> >>
>> >> 配置信息:
>> >>
>> >> state.backend: rocksdb
>> >>
>> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>> >>
>> >> state.backend.incremental: true
>> >>
>> >> jobmanager.execution.failover-strategy: region
>> >>
>> >> io.tmp.dirs: /data/flink1_10/tmp
>> >>
>> >>
>> >>
>> >>
>> >> 任务的checkpoint配置:
>> >>
>> >> env.enableCheckpointing(2 * 60 * 1000);
>> >>
>> >>
>> >>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> >>
>> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>> >>
>> >> env.getCheckpointConfig().setCheckpointTimeout(60000);
>> >>
>> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> >>
>> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>> >>
>> >>
>> >>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> >>
>> >>
>> >>
>> >>
>> >> 日志信息:
>> >>
>> >>
>> >>
>> >>
>> >> 2020-04-01 11:13:03
>> >>
>> >> java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> >> state backend for
>> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
>> the
>> >> 1 provided restore options.
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> ... 9 more
>> >>
>> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught
>> >> unexpected exception.
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> ... 11 more
>> >>
>> >> Caused by: java.nio.file.NoSuchFileException:
>> >>
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> >> ->
>> >>
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>> >>
>> >> at
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>> >>
>> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>> >>
>> >> at
>> >>
>> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>> >>
>> >> at java.nio.file.Files.createLink(Files.java:1086)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> ... 15 more
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> TaskManager的报错信息:
>> >>
>> >>
>> >>
>> >>
>> >> 2020-04-01 14:48:10,726 ERROR
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
>> >> Caught unexpected exception.
>> >>
>> >> java.io.InterruptedIOException: Interrupted while waiting for data to
>> be
>> >> acknowledged by pipeline
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>> >>
>> >> at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> >>
>> >> at
>> >>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> >>
>> >> at
>> >>
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> 2020-04-01 14:48:10,726 WARN
>> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
>> >> Exception while restoring keyed state backend for
>> >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
>> >> alternative (1/1), will retry while more alternatives are available.
>> >>
>> >> org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected
>> >> exception.
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting
>> for
>> >> data to be acknowledged by pipeline
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>> >>
>> >> at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> >>
>> >> at
>> >>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> >>
>> >> at
>> >>
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> ... 15 more
>>

Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

Posted by Congxian Qiu <qc...@gmail.com>.
HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
CheckpointMetadataLoadingTest 的相关测试。
我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好

Best,
Congxian


chenxyz <ch...@163.com> 于2020年4月1日周三 下午5:18写道:

> Hi, 从贤,
> 我查看了下HDFS,
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>
>
>
>
>
>
>
>
> 在 2020-04-01 16:50:13,"Congxian Qiu" <qc...@gmail.com> 写道:
> >Hi
> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
> >从 TM 日志看像下载出错了,你可以看下
>
> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
> >
> >Best,
> >Congxian
> >
> >
> >chenxyz <ch...@163.com> 于2020年4月1日周三 下午3:02写道:
> >
> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
> >> KeyedProcessOperator。这个问题怎么解决呢?
> >>
> >> 版本:1.10 standalone
> >>
> >> 配置信息:
> >>
> >> state.backend: rocksdb
> >>
> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
> >>
> >> state.backend.incremental: true
> >>
> >> jobmanager.execution.failover-strategy: region
> >>
> >> io.tmp.dirs: /data/flink1_10/tmp
> >>
> >>
> >>
> >>
> >> 任务的checkpoint配置:
> >>
> >> env.enableCheckpointing(2 * 60 * 1000);
> >>
> >>
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
> >>
> >> env.getCheckpointConfig().setCheckpointTimeout(60000);
> >>
> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >>
> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
> >>
> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >> 日志信息:
> >>
> >>
> >>
> >>
> >> 2020-04-01 11:13:03
> >>
> >> java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> >> state backend for
> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
> the
> >> 1 provided restore options.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> ... 9 more
> >>
> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught
> >> unexpected exception.
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> ... 11 more
> >>
> >> Caused by: java.nio.file.NoSuchFileException:
> >>
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> >> ->
> >>
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
> >>
> >> at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> >>
> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> >>
> >> at
> >>
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> >>
> >> at java.nio.file.Files.createLink(Files.java:1086)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> >>
> >> ... 15 more
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> TaskManager的报错信息:
> >>
> >>
> >>
> >>
> >> 2020-04-01 14:48:10,726 ERROR
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
> >> Caught unexpected exception.
> >>
> >> java.io.InterruptedIOException: Interrupted while waiting for data to
> be
> >> acknowledged by pipeline
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
> >>
> >> at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> >>
> >> at
> >>
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> >>
> >> at
> >>
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
> >>
> >> at
> >>
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
> >>
> >> at
> >>
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> 2020-04-01 14:48:10,726 WARN
> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
> >> Exception while restoring keyed state backend for
> >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
> >> alternative (1/1), will retry while more alternatives are available.
> >>
> >> org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected
> >> exception.
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting
> for
> >> data to be acknowledged by pipeline
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
> >>
> >> at
> >>
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
> >>
> >> at
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> >>
> >> at
> >>
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> >>
> >> at
> >>
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> >>
> >> at
> >>
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
> >>
> >> at
> >>
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
> >>
> >> at
> >>
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
> >>
> >> at
> >>
> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
> >>
> >> at
> >>
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
> >>
> >> ... 15 more
>

Re:Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

Posted by chenxyz <ch...@163.com>.
Hi, 从贤,
我查看了下HDFS, /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。








在 2020-04-01 16:50:13,"Congxian Qiu" <qc...@gmail.com> 写道:
>Hi
>Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
>从 TM 日志看像下载出错了,你可以看下
>/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
>
>Best,
>Congxian
>
>
>chenxyz <ch...@163.com> 于2020年4月1日周三 下午3:02写道:
>
>> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
>> KeyedProcessOperator。这个问题怎么解决呢?
>>
>> 版本:1.10 standalone
>>
>> 配置信息:
>>
>> state.backend: rocksdb
>>
>> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>>
>> state.backend.incremental: true
>>
>> jobmanager.execution.failover-strategy: region
>>
>> io.tmp.dirs: /data/flink1_10/tmp
>>
>>
>>
>>
>> 任务的checkpoint配置:
>>
>> env.enableCheckpointing(2 * 60 * 1000);
>>
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>>
>> env.getCheckpointConfig().setCheckpointTimeout(60000);
>>
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>> 日志信息:
>>
>>
>>
>>
>> 2020-04-01 11:13:03
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for
>> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the
>> 1 provided restore options.
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>>
>> ... 9 more
>>
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>
>> ... 11 more
>>
>> Caused by: java.nio.file.NoSuchFileException:
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> ->
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>>
>> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>>
>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>
>> at
>> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>>
>> at java.nio.file.Files.createLink(Files.java:1086)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>>
>> ... 15 more
>>
>>
>>
>>
>>
>>
>>
>> TaskManager的报错信息:
>>
>>
>>
>>
>> 2020-04-01 14:48:10,726 ERROR
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
>> Caught unexpected exception.
>>
>> java.io.InterruptedIOException: Interrupted while waiting for data to be
>> acknowledged by pipeline
>>
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>>
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>>
>> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>>
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>
>> at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>>
>> at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>>
>> at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>>
>> at
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>
>> at
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>>
>> at
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> 2020-04-01 14:48:10,726 WARN
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
>> Exception while restoring keyed state backend for
>> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
>> alternative (1/1), will retry while more alternatives are available.
>>
>> org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
>> exception.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.io.InterruptedIOException: Interrupted while waiting for
>> data to be acknowledged by pipeline
>>
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>>
>> at
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>>
>> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>>
>> at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>>
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>>
>> at
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>>
>> at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>>
>> at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>>
>> at
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>
>> at
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>>
>> at
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>>
>> at
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>>
>> ... 15 more

Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

Posted by Congxian Qiu <qc...@gmail.com>.
Hi
Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
从 TM 日志看像下载出错了,你可以看下
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因

Best,
Congxian


chenxyz <ch...@163.com> 于2020年4月1日周三 下午3:02写道:

> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
> KeyedProcessOperator。这个问题怎么解决呢?
>
> 版本:1.10 standalone
>
> 配置信息:
>
> state.backend: rocksdb
>
> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>
> state.backend.incremental: true
>
> jobmanager.execution.failover-strategy: region
>
> io.tmp.dirs: /data/flink1_10/tmp
>
>
>
>
> 任务的checkpoint配置:
>
> env.enableCheckpointing(2 * 60 * 1000);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>
> env.getCheckpointConfig().setCheckpointTimeout(60000);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
> 日志信息:
>
>
>
>
> 2020-04-01 11:13:03
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the
> 1 provided restore options.
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
> ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 11 more
>
> Caused by: java.nio.file.NoSuchFileException:
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> ->
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>
> at
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>
> at java.nio.file.Files.createLink(Files.java:1086)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>
> ... 15 more
>
>
>
>
>
>
>
> TaskManager的报错信息:
>
>
>
>
> 2020-04-01 14:48:10,726 ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
> Caught unexpected exception.
>
> java.io.InterruptedIOException: Interrupted while waiting for data to be
> acknowledged by pipeline
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>
> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>
> at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>
> at
> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>
> at
> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> 2020-04-01 14:48:10,726 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
> Exception while restoring keyed state backend for
> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
> alternative (1/1), will retry while more alternatives are available.
>
> org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
> exception.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.InterruptedIOException: Interrupted while waiting for
> data to be acknowledged by pipeline
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>
> at
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>
> at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>
> at
> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>
> at
> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>
> at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>
> at
> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>
> at
> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>
> ... 15 more