You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/26 02:51:20 UTC
[incubator-seatunnel] branch st-engine updated: [New-Engine]Support Hdfs Storage and async storage (#2485)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 72e66272d [New-Engine]Support Hdfs Storage and async storage (#2485)
72e66272d is described below
commit 72e66272d5ab54d8d8d54fba8a4a1193c94ba3d9
Author: Kirs <ki...@apache.org>
AuthorDate: Fri Aug 26 10:51:15 2022 +0800
[New-Engine]Support Hdfs Storage and async storage (#2485)
* [New-Engine]Checkpoint storage
Support local file storage plugin
Support proto-stuff serillazer
Support SPI load check-point storage
* Improve getLatest method
* refactoring some methods
* refact init storage instance
* [New-Enging]support checkpoint storage
* [New-Engine]Support Hdfs Storage and async storage
Support config storage namespace
Support windows local file storage
Support async submit task
* fix license header
* fix ut
* fix some log and improve name split method
* fix some log and improve name split method
* fix code style
* fix code style
* remove hflush
---
.../operation/source/SourceRegisterOperation.java | 1 -
.../storage/api/AbstractCheckpointStorage.java | 119 +++++++++-
.../checkpoint/storage/api/CheckpointStorage.java | 8 +
.../storage/api/CheckpointStorageFactory.java | 4 +-
.../storage/common/ProtoStuffSerializer.java | 10 +-
.../storage/common/StorageThreadFactory.java | 50 ++++
.../StorageConstants.java} | 26 +-
.../pom.xml | 25 +-
.../checkpoint/storage/hdfs/HdfsConstants.java} | 27 +--
.../checkpoint/storage/hdfs/HdfsStorage.java | 261 +++++++++++++++++++++
.../storage/hdfs/HdfsStorageFactory.java} | 16 +-
.../checkpoint-storage-local-file/pom.xml | 13 +-
.../storage/localfile/LocalFileStorage.java | 104 ++++----
.../storage/localfile/LocalFileStorageFactory.java | 2 +-
.../storage/localfile/LocalFileStorageTest.java | 4 +-
.../checkpoint-storage-plugins/pom.xml | 6 +
16 files changed, 542 insertions(+), 134 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 63e9ab4fb..1f89f1f9d 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -63,7 +63,6 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
return null;
}, new RetryUtils.RetryMaterial(RETRY_TIME, true,
exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
-
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
index e7774e3cf..acf607b7e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java
@@ -23,12 +23,26 @@ package org.apache.seatunnel.engine.checkpoint.storage.api;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
+import org.apache.seatunnel.engine.checkpoint.storage.common.StorageThreadFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import lombok.extern.slf4j.Slf4j;
+
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+@Slf4j
public abstract class AbstractCheckpointStorage implements CheckpointStorage {
/**
@@ -36,23 +50,31 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
*/
private final Serializer serializer = new ProtoStuffSerializer();
+ public static final String DEFAULT_CHECKPOINT_FILE_PATH_SPLIT = "/";
+
/**
* storage root directory
+ * if not set, use default value
*/
- private static final String CHECKPOINT_DEFAULT_FILE_DIR = "/tmp/seatunnel/checkpoint/";
+ private String storageNameSpace = "/seatunnel/checkpoint/";
public static final String FILE_NAME_SPLIT = "-";
- public static final int FILE_NAME_PIPELINE_ID_INDEX = 1;
+ public static final int FILE_NAME_PIPELINE_ID_INDEX = 2;
public static final int FILE_SORT_ID_INDEX = 0;
- public static final String FILE_FORMAT = "txt";
+ public static final int FILE_NAME_RANDOM_RANGE = 1000;
- /**
- * record order
- */
- private final AtomicLong counter = new AtomicLong(0);
+ public static final String FILE_FORMAT = "ser";
+
+ private volatile ExecutorService executorService;
+
+ private static final int DEFAULT_THREAD_POOL_MIN_SIZE = Runtime.getRuntime().availableProcessors() * 2 + 1;
+
+ private static final int DEFAULT_THREAD_POOL_MAX_SIZE = Runtime.getRuntime().availableProcessors() * 4 + 1;
+
+ private static final int DEFAULT_THREAD_POOL_QUENE_SIZE = 1024;
/**
* init storage instance
@@ -65,11 +87,11 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
public abstract void initStorage(Map<String, String> configuration) throws CheckpointStorageException;
public String getStorageParentDirectory() {
- return CHECKPOINT_DEFAULT_FILE_DIR;
+ return storageNameSpace;
}
public String getCheckPointName(PipelineState state) {
- return counter.incrementAndGet() + FILE_NAME_SPLIT + state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." + FILE_FORMAT;
+ return System.nanoTime() + FILE_NAME_SPLIT + ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT + state.getPipelineId() + FILE_NAME_SPLIT + state.getCheckpointId() + "." + FILE_FORMAT;
}
public byte[] serializeCheckPointData(PipelineState state) throws IOException {
@@ -79,4 +101,83 @@ public abstract class AbstractCheckpointStorage implements CheckpointStorage {
public PipelineState deserializeCheckPointData(byte[] data) throws IOException {
return serializer.deserialize(data, PipelineState.class);
}
+
+ public void setStorageNameSpace(String storageNameSpace) {
+ if (storageNameSpace != null) {
+ this.storageNameSpace = storageNameSpace;
+ }
+ }
+
+ public Set<String> getLatestPipelineNames(List<String> fileNames) {
+ Map<String, String> latestPipelineMap = new HashMap<>();
+ fileNames.forEach(fileName -> {
+ String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+ long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
+ String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
+ if (latestPipelineMap.containsKey(filePipelineId)) {
+ long oldVersion = Long.parseLong(latestPipelineMap.get(filePipelineId).split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
+ if (fileVersion > oldVersion) {
+ latestPipelineMap.put(filePipelineId, fileName);
+ }
+ } else {
+ latestPipelineMap.put(filePipelineId, fileName);
+ }
+ });
+ Set<String> latestPipelines = new HashSet<>(latestPipelineMap.size());
+ latestPipelineMap.forEach((pipelineId, fileName) -> latestPipelines.add(fileName));
+ return latestPipelines;
+ }
+
+ /**
+ * get latest checkpoint file name
+ *
+ * @param fileNames file names
+ * @return latest checkpoint file name
+ */
+ public String getLatestCheckpointFileNameByJobIdAndPipelineId(List<String> fileNames, String pipelineId) {
+ AtomicReference<String> latestFileName = new AtomicReference<>();
+ AtomicLong latestVersion = new AtomicLong();
+ fileNames.forEach(fileName -> {
+ String[] fileNameSegments = fileName.split(FILE_NAME_SPLIT);
+ long fileVersion = Long.parseLong(fileNameSegments[FILE_SORT_ID_INDEX]);
+ String filePipelineId = fileNameSegments[FILE_NAME_PIPELINE_ID_INDEX];
+ if (pipelineId.equals(filePipelineId) && fileVersion > latestVersion.get()) {
+ latestVersion.set(fileVersion);
+ latestFileName.set(fileName);
+ }
+ });
+ return latestFileName.get();
+ }
+
+ /**
+ * get the latest checkpoint file name
+ *
+ * @param fileName file names. note: file name cannot contain parent path
+ * @return latest checkpoint file name
+ */
+ public String getPipelineIdByFileName(String fileName) {
+ return fileName.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+ }
+
+ @Override
+ public void asyncStoreCheckPoint(PipelineState state) {
+ initExecutor();
+ this.executorService.submit(() -> {
+ try {
+ storeCheckPoint(state);
+ } catch (Exception e) {
+ log.error(String.format("store checkpoint failed, job id : %s, pipeline id : %d", state.getJobId(), state.getPipelineId()), e);
+ }
+ });
+ }
+
+ private void initExecutor() {
+ if (null == this.executorService || this.executorService.isShutdown()) {
+ synchronized (this) {
+ if (null == this.executorService || this.executorService.isShutdown()) {
+ this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_MIN_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(DEFAULT_THREAD_POOL_QUENE_SIZE), new StorageThreadFactory());
+ }
+ }
+ }
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
index c9fb732a6..2d38c6b36 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorage.java
@@ -35,6 +35,14 @@ public interface CheckpointStorage {
*/
String storeCheckPoint(PipelineState state) throws CheckpointStorageException;
+ /**
+ * async save checkpoint to storage
+ *
+ * @param state PipelineState
+ * @throws CheckpointStorageException if save checkpoint failed
+ */
+ void asyncStoreCheckPoint(PipelineState state) throws CheckpointStorageException;
+
/**
* get all checkpoint from storage
* if no data found, return empty list
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
index 0723b5d84..73cb777b8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
@@ -20,6 +20,8 @@
package org.apache.seatunnel.engine.checkpoint.storage.api;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
import java.util.Map;
/**
@@ -43,5 +45,5 @@ public interface CheckpointStorageFactory {
* value: "fs.defaultFS"
* return storage plugin instance
*/
- CheckpointStorage create(Map<String, String> configuration);
+ CheckpointStorage create(Map<String, String> configuration) throws CheckpointStorageException;
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java
index 32288ece5..5893e0c64 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/ProtoStuffSerializer.java
@@ -30,11 +30,6 @@ import java.util.concurrent.ConcurrentHashMap;
public class ProtoStuffSerializer implements Serializer {
- /**
- * Consider configurable
- */
- private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(1024);
-
/**
* At the moment it looks like we only have one Schema.
*/
@@ -48,12 +43,13 @@ public class ProtoStuffSerializer implements Serializer {
@Override
public <T> byte[] serialize(T obj) {
Class<T> clazz = (Class<T>) obj.getClass();
+ LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
Schema<T> schema = getSchema(clazz);
byte[] data;
try {
- data = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
+ data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} finally {
- BUFFER.clear();
+ buffer.clear();
}
return data;
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/StorageThreadFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/StorageThreadFactory.java
new file mode 100644
index 000000000..76cbb2435
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/common/StorageThreadFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StorageThreadFactory implements ThreadFactory {
+ private final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final ThreadGroup group;
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ private final String namePrefix;
+
+ public StorageThreadFactory() {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+ namePrefix = "StorageThread-" + poolNumber.getAndIncrement() + "-thread-";
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
+ if (thread.isDaemon()) {
+ thread.setDaemon(false);
+ }
+ if (thread.getPriority() != Thread.NORM_PRIORITY) {
+ thread.setPriority(Thread.NORM_PRIORITY);
+ }
+ return thread;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/constants/StorageConstants.java
similarity index 52%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/constants/StorageConstants.java
index 0723b5d84..7ce1957e5 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/constants/StorageConstants.java
@@ -18,30 +18,12 @@
*
*/
-package org.apache.seatunnel.engine.checkpoint.storage.api;
+package org.apache.seatunnel.engine.checkpoint.storage.constants;
-import java.util.Map;
-
-/**
- * All checkpoint storage plugins need to implement it
- */
-public interface CheckpointStorageFactory {
-
- /**
- * Returns the name of the storage plugin
- */
- String name();
+public class StorageConstants {
/**
- * create storage plugin instance
- *
- * @param configuration storage system config params
- * key: storage system config key
- * value: storage system config value
- * e.g.
- * key: "FS_DEFAULT_NAME_KEY"
- * value: "fs.defaultFS"
- * return storage plugin instance
+ * The name of the configuration property that specifies the name of the file system.
*/
- CheckpointStorage create(Map<String, String> configuration);
+ public static final String STORAGE_NAME_SPACE = "storageNameSpace";
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
similarity index 70%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
index e6a13657e..1a67ea683 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/pom.xml
@@ -28,21 +28,24 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>checkpoint-storage-local-file</artifactId>
+ <artifactId>checkpoint-storage-hdfs</artifactId>
<dependencies>
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>checkpoint-storage-api</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <version>${flink-shaded-hadoop-2.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <!--test-->
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
+
</project>
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
similarity index 51%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
index 0723b5d84..d054d1611 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/CheckpointStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsConstants.java
@@ -18,30 +18,13 @@
*
*/
-package org.apache.seatunnel.engine.checkpoint.storage.api;
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
-import java.util.Map;
+public class HdfsConstants {
-/**
- * All checkpoint storage plugins need to implement it
- */
-public interface CheckpointStorageFactory {
+ public static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
- /**
- * Returns the name of the storage plugin
- */
- String name();
+ public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
- /**
- * create storage plugin instance
- *
- * @param configuration storage system config params
- * key: storage system config key
- * value: storage system config value
- * e.g.
- * key: "FS_DEFAULT_NAME_KEY"
- * value: "fs.defaultFS"
- * return storage plugin instance
- */
- CheckpointStorage create(Map<String, String> configuration);
+ public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
new file mode 100644
index 000000000..56961492f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
+
+import static org.apache.seatunnel.engine.checkpoint.storage.constants.StorageConstants.STORAGE_NAME_SPACE;
+
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Slf4j
+public class HdfsStorage extends AbstractCheckpointStorage {
+
+ public FileSystem fs;
+ private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
+
+ private static final String KERBEROS_KEY = "kerberos";
+
+ private static final String STORAGE_TMP_SUFFIX = "tmp";
+
+ public HdfsStorage(Map<String, String> configuration) throws CheckpointStorageException {
+ this.initStorage(configuration);
+ }
+
+ @Override
+ public void initStorage(Map<String, String> configuration) throws CheckpointStorageException {
+ Configuration hadoopConf = new Configuration();
+ if (configuration.containsKey(HdfsConstants.HDFS_DEF_FS_NAME)) {
+ hadoopConf.set(HdfsConstants.HDFS_DEF_FS_NAME, configuration.get(HdfsConstants.HDFS_DEF_FS_NAME));
+ }
+ if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) {
+ setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE));
+ }
+ // todo support other config configurations
+ if (configuration.containsKey(HdfsConstants.KERBEROS_PRINCIPAL) && configuration.containsKey(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH)) {
+ String kerberosPrincipal = configuration.get(HdfsConstants.KERBEROS_PRINCIPAL);
+ String kerberosKeytabFilePath = configuration.get(HdfsConstants.KERBEROS_KEYTAB_FILE_PATH);
+ if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
+ hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
+ authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
+ }
+ }
+ JobConf jobConf = new JobConf(hadoopConf);
+ try {
+ fs = FileSystem.get(jobConf);
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to get file system", e);
+ }
+
+ }
+
+ @Override
+ public String storeCheckPoint(PipelineState state) throws CheckpointStorageException {
+ byte[] datas;
+ try {
+ datas = serializeCheckPointData(state);
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to serialize checkpoint data,state is :" + state, e);
+ }
+ Path filePath = new Path(getStorageParentDirectory() + state.getJobId() + "/" + getCheckPointName(state));
+
+ Path tmpFilePath = new Path(getStorageParentDirectory() + state.getJobId() + "/" + getCheckPointName(state) + STORAGE_TMP_SUFFIX);
+ try (FSDataOutputStream out = fs.create(tmpFilePath)) {
+ out.write(datas);
+ out.hsync();
+ fs.rename(tmpFilePath, filePath);
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to write checkpoint data, state: " + state, e);
+ } finally {
+ try {
+ // clean up tmp file, if still lying around
+ if (fs.exists(tmpFilePath)) {
+ fs.delete(tmpFilePath, false);
+ }
+ } catch (IOException ioe) {
+ log.error("Failed to delete tmp file", ioe);
+ }
+ }
+ return filePath.getName();
+ }
+
+ @Override
+ public List<PipelineState> getAllCheckpoints(String jobId) throws CheckpointStorageException {
+ String path = getStorageParentDirectory() + jobId;
+ List<String> fileNames = getFileNames(path);
+ if (fileNames.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+ }
+ List<PipelineState> states = new ArrayList<>();
+ fileNames.forEach(file -> {
+ try {
+ states.add(readPipelineState(file, jobId));
+ } catch (CheckpointStorageException e) {
+ log.error("Failed to read checkpoint data from file: " + file, e);
+ }
+ });
+ if (states.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+ }
+ return states;
+ }
+
+ @Override
+ public List<PipelineState> getLatestCheckpoint(String jobId) throws CheckpointStorageException {
+ String path = getStorageParentDirectory() + jobId;
+ List<String> fileNames = getFileNames(path);
+ if (fileNames.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+ }
+ Set<String> latestPipelineNames = getLatestPipelineNames(fileNames);
+ List<PipelineState> latestPipelineStates = new ArrayList<>();
+ latestPipelineNames.forEach(fileName -> {
+ try {
+ latestPipelineStates.add(readPipelineState(fileName, jobId));
+ } catch (CheckpointStorageException e) {
+ log.error("Failed to read pipeline state for file: {}", fileName, e);
+ }
+ });
+
+ if (latestPipelineStates.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id:{} " + jobId);
+ }
+ return latestPipelineStates;
+ }
+
+ @Override
+ public PipelineState getLatestCheckpointByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
+ String path = getStorageParentDirectory() + jobId;
+ List<String> fileNames = getFileNames(path);
+ if (fileNames.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+ }
+
+ String latestFileName = getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId);
+ if (latestFileName == null) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId + ", pipeline id is: " + pipelineId);
+ }
+ return readPipelineState(latestFileName, jobId);
+ }
+
+ @Override
+ public List<PipelineState> getCheckpointsByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
+ String path = getStorageParentDirectory() + jobId;
+ List<String> fileNames = getFileNames(path);
+ if (fileNames.isEmpty()) {
+ throw new CheckpointStorageException("No checkpoint found for job, job id is: " + jobId);
+ }
+
+ List<PipelineState> pipelineStates = new ArrayList<>();
+ fileNames.forEach(file -> {
+ String filePipelineId = file.split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+ if (pipelineId.equals(filePipelineId)) {
+ try {
+ pipelineStates.add(readPipelineState(file, jobId));
+ } catch (Exception e) {
+ log.error("Failed to read checkpoint data from file " + file, e);
+ }
+ }
+ });
+ return pipelineStates;
+ }
+
+ @Override
+ public void deleteCheckpoint(String jobId) {
+ String jobPath = getStorageParentDirectory() + jobId;
+ try {
+ fs.delete(new Path(jobPath), true);
+ } catch (IOException e) {
+ log.error("Failed to delete checkpoint for job {}", jobId, e);
+ }
+ }
+
+ /**
+ * Authenticate kerberos
+ *
+ * @param kerberosPrincipal kerberos principal
+ * @param kerberosKeytabFilePath kerberos keytab file path
+ * @param hdfsConf hdfs configuration
+ * @throws CheckpointStorageException authentication exception
+ */
+ private void authenticateKerberos(String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf) throws CheckpointStorageException {
+ UserGroupInformation.setConfiguration(hdfsConf);
+ try {
+ UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to login user from keytab : " + kerberosKeytabFilePath + " and kerberos principal : " + kerberosPrincipal, e);
+ }
+ }
+
+ private List<String> getFileNames(String path) throws CheckpointStorageException {
+ try {
+
+ RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(path), false);
+ List<String> fileNames = new ArrayList<>();
+ while (fileStatusRemoteIterator.hasNext()) {
+ LocatedFileStatus fileStatus = fileStatusRemoteIterator.next();
+ if (!fileStatus.getPath().getName().endsWith(FILE_FORMAT)) {
+ fileNames.add(fileStatus.getPath().getName());
+ }
+ fileNames.add(fileStatus.getPath().getName());
+ }
+ return fileNames;
+ } catch (IOException e) {
+ throw new CheckpointStorageException("Failed to list files from names" + path, e);
+ }
+ }
+
+ /**
+ * Get checkpoint name
+ *
+ * @param fileName file name
+ * @return checkpoint data
+ */
+ private PipelineState readPipelineState(String fileName, String jobId) throws CheckpointStorageException {
+ fileName = getStorageParentDirectory() + jobId + DEFAULT_CHECKPOINT_FILE_PATH_SPLIT + fileName;
+ try (FSDataInputStream in = fs.open(new Path(fileName))) {
+ byte[] datas = new byte[in.available()];
+ in.read(datas);
+ return deserializeCheckPointData(datas);
+ } catch (IOException e) {
+ throw new CheckpointStorageException(String.format("Failed to read checkpoint data, file name is %s,job id is %s", fileName, jobId), e);
+ }
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
similarity index 75%
copy from seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
copy to seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
index 9c8f4ad07..ccb96c9b6 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorageFactory.java
@@ -18,29 +18,25 @@
*
*/
-package org.apache.seatunnel.engine.checkpoint.storage.localfile;
+package org.apache.seatunnel.engine.checkpoint.storage.hdfs;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
+import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import com.google.auto.service.AutoService;
import java.util.Map;
-/**
- * Local file storage plug-in, use local file storage,
- * only suitable for single-machine testing or small data scale use, use with caution in production environment
- */
@AutoService(CheckpointStorageFactory.class)
-public class LocalFileStorageFactory implements CheckpointStorageFactory {
-
+public class HdfsStorageFactory implements CheckpointStorageFactory {
@Override
public String name() {
- return "localfile";
+ return "hdfs";
}
@Override
- public CheckpointStorage create(Map<String, String> configuration) {
- return new LocalFileStorage();
+ public CheckpointStorage create(Map<String, String> configuration) throws CheckpointStorageException {
+ return new HdfsStorage(configuration);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
index e6a13657e..eb618ffaa 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/pom.xml
@@ -30,15 +30,18 @@
<artifactId>checkpoint-storage-local-file</artifactId>
<dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>checkpoint-storage-api</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ </dependency>
<!--test-->
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
index cb4bcf63c..b0a22c4a8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorage.java
@@ -20,35 +20,60 @@
package org.apache.seatunnel.engine.checkpoint.storage.localfile;
+import static org.apache.seatunnel.engine.checkpoint.storage.constants.StorageConstants.STORAGE_NAME_SPACE;
+
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.AbstractCheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
@Slf4j
public class LocalFileStorage extends AbstractCheckpointStorage {
private static final String[] FILE_EXTENSIONS = new String[]{FILE_FORMAT};
- public LocalFileStorage() {
- // Nothing to do
+ private static final String DEFAULT_WINDOWS_OS_NAME_SPACE = "C:\\ProgramData\\seatunnel\\checkpoint\\";
+
+ private static final String DEFAULT_LINUX_OS_NAME_SPACE = "/tmp/seatunnel/checkpoint/";
+
+ public LocalFileStorage(Map<String, String> configuration) {
+ initStorage(configuration);
}
@Override
- public void initStorage(Map<String, String> configuration) throws CheckpointStorageException {
- // Nothing to do
+ public void initStorage(Map<String, String> configuration) {
+ if (MapUtils.isEmpty(configuration)) {
+ setDefaultStorageSpaceByOSName();
+ return;
+ }
+ if (StringUtils.isNotBlank(configuration.get(STORAGE_NAME_SPACE))) {
+ setStorageNameSpace(configuration.get(STORAGE_NAME_SPACE));
+ }
+ }
+
+ /**
+ * set default storage root directory
+ */
+ private void setDefaultStorageSpaceByOSName() {
+ if (System.getProperty("os.name").toLowerCase().contains("windows")) {
+ setStorageNameSpace(DEFAULT_WINDOWS_OS_NAME_SPACE);
+ } else {
+ setStorageNameSpace(DEFAULT_LINUX_OS_NAME_SPACE);
+ }
}
@Override
@@ -60,7 +85,7 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
throw new CheckpointStorageException("Failed to serialize checkpoint data", e);
}
//Consider file paths for different operating systems
- String fileName = getStorageParentDirectory() + state.getJobId() + "/" + getCheckPointName(state);
+ String fileName = getStorageParentDirectory() + state.getJobId() + File.separator + getCheckPointName(state);
File file = new File(fileName);
try {
@@ -107,27 +132,20 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
if (fileList.isEmpty()) {
throw new CheckpointStorageException("No checkpoint found for job " + jobId);
}
- Map<String, File> latestPipelineMap = new HashMap<>();
+ List<String> fileNames = fileList.stream().map(File::getName).collect(Collectors.toList());
+ Set<String> latestPipelines = getLatestPipelineNames(fileNames);
+ List<PipelineState> latestPipelineFiles = new ArrayList<>(latestPipelines.size());
fileList.forEach(file -> {
- String filePipelineId = file.getName().split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
- int fileVersion = Integer.parseInt(file.getName().split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
- if (latestPipelineMap.containsKey(filePipelineId)) {
- int oldVersion = Integer.parseInt(latestPipelineMap.get(filePipelineId).getName().split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
- if (fileVersion > oldVersion) {
- latestPipelineMap.put(filePipelineId, file);
+ String fileName = file.getName();
+ if (latestPipelines.contains(fileName)) {
+ try {
+ byte[] data = FileUtils.readFileToByteArray(file);
+ latestPipelineFiles.add(deserializeCheckPointData(data));
+ } catch (IOException e) {
+ log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
}
- } else {
- latestPipelineMap.put(filePipelineId, file);
- }
- });
- List<PipelineState> latestPipelineFiles = new ArrayList<>(latestPipelineMap.size());
- latestPipelineMap.values().forEach(file -> {
- try {
- byte[] data = FileUtils.readFileToByteArray(file);
- latestPipelineFiles.add(deserializeCheckPointData(data));
- } catch (IOException e) {
- log.error("Failed to read checkpoint data from file " + file.getAbsolutePath(), e);
}
+
});
if (latestPipelineFiles.isEmpty()) {
throw new CheckpointStorageException("Failed to read checkpoint data from file");
@@ -138,32 +156,32 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
@Override
public PipelineState getLatestCheckpointByJobIdAndPipelineId(String jobId, String pipelineId) throws CheckpointStorageException {
- Collection<File> fileList = FileUtils.listFiles(new File(getStorageParentDirectory() + jobId), FILE_EXTENSIONS, false);
+ String parentPath = getStorageParentDirectory() + jobId;
+ Collection<File> fileList = FileUtils.listFiles(new File(parentPath), FILE_EXTENSIONS, false);
if (fileList.isEmpty()) {
throw new CheckpointStorageException("No checkpoint found for job " + jobId);
}
+ List<String> fileNames = fileList.stream().map(File::getName).collect(Collectors.toList());
+
+ String latestFileName = getLatestCheckpointFileNameByJobIdAndPipelineId(fileNames, pipelineId);
- AtomicReference<File> latestFile = new AtomicReference<>();
- AtomicInteger latestVersion = new AtomicInteger();
+ AtomicReference<PipelineState> latestFile = new AtomicReference<>(null);
fileList.forEach(file -> {
- int fileVersion = Integer.parseInt(file.getName().split(FILE_NAME_SPLIT)[FILE_SORT_ID_INDEX]);
- String filePipelineId = file.getName().split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
- if (pipelineId.equals(filePipelineId) && fileVersion > latestVersion.get()) {
- latestVersion.set(fileVersion);
- latestFile.set(file);
+ String fileName = file.getName();
+ if (fileName.equals(latestFileName)) {
+ try {
+ byte[] data = FileUtils.readFileToByteArray(file);
+ latestFile.set(deserializeCheckPointData(data));
+ } catch (IOException e) {
+ log.error("read checkpoint data from file " + file.getAbsolutePath(), e);
+ }
}
});
- if (latestFile.get().exists()) {
- try {
- byte[] data = FileUtils.readFileToByteArray(latestFile.get());
- return deserializeCheckPointData(data);
- } catch (IOException e) {
- throw new CheckpointStorageException("Failed to read checkpoint data from file " + latestFile.get().getAbsolutePath(), e);
- }
-
+ if (latestFile.get() == null) {
+ throw new CheckpointStorageException("Failed to read checkpoint data from file, file name " + latestFileName);
}
- return null;
+ return latestFile.get();
}
@Override
@@ -174,7 +192,7 @@ public class LocalFileStorage extends AbstractCheckpointStorage {
}
List<PipelineState> pipelineStates = new ArrayList<>();
fileList.forEach(file -> {
- String filePipelineId = file.getName().split(FILE_NAME_SPLIT)[FILE_NAME_PIPELINE_ID_INDEX];
+ String filePipelineId = getPipelineIdByFileName(file.getName());
if (pipelineId.equals(filePipelineId)) {
try {
byte[] data = FileUtils.readFileToByteArray(file);
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
index 9c8f4ad07..bc7ec95f2 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageFactory.java
@@ -41,6 +41,6 @@ public class LocalFileStorageFactory implements CheckpointStorageFactory {
@Override
public CheckpointStorage create(Map<String, String> configuration) {
- return new LocalFileStorage();
+ return new LocalFileStorage(configuration);
}
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java
index 40002cc17..9371662a2 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-local-file/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/localfile/LocalFileStorageTest.java
@@ -37,7 +37,7 @@ import java.util.List;
@EnabledOnOs({LINUX, MAC})
public class LocalFileStorageTest {
- private static LocalFileStorage STORAGE = new LocalFileStorage();
+ private static LocalFileStorage STORAGE = new LocalFileStorage(null);
private static final String JOB_ID = "chris";
@Before
@@ -60,7 +60,7 @@ public class LocalFileStorageTest {
public void testGetAllCheckpoints() throws CheckpointStorageException {
List<PipelineState> pipelineStates = STORAGE.getAllCheckpoints(JOB_ID);
- Assertions.assertEquals(4, pipelineStates.size());
+ Assertions.assertEquals(3, pipelineStates.size());
}
@Test
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml
index 0fa871a93..5f8e88159 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/pom.xml
@@ -31,6 +31,11 @@
<artifactId>checkpoint-storage-plugins</artifactId>
<packaging>pom</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>checkpoint-storage-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
@@ -39,6 +44,7 @@
<modules>
<module>checkpoint-storage-local-file</module>
+ <module>checkpoint-storage-hdfs</module>
</modules>
</project>
\ No newline at end of file