You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/06/21 07:23:56 UTC
[hudi] 01/02: [HUDI-3475] Initialize hudi table management module.
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4640a3bbb8e212030f94848a0112784d98772de8
Author: 喻兆靖 <yu...@bytedance.com>
AuthorDate: Wed Jun 8 09:54:31 2022 +0800
[HUDI-3475] Initialize hudi table management module.
---
.../cluster/ClusteringPlanActionExecutor.java | 20 ++
.../apache/hudi/client/SparkRDDWriteClient.java | 4 +
hudi-table-management-service/pom.xml | 333 +++++++++++++++++++++
.../hudi/table/management/RequestHandler.java | 194 ++++++++++++
.../table/management/TableManagementServer.java | 154 ++++++++++
.../hudi/table/management/common/EnvConstant.java | 29 ++
.../table/management/common/ServiceConfig.java | 117 ++++++++
.../table/management/common/ServiceContext.java | 78 +++++
.../common/TableManagementServiceConfig.java | 29 ++
.../hudi/table/management/entity/Action.java | 52 ++++
.../table/management/entity/AssistQueryEntity.java | 47 +++
.../hudi/table/management/entity/Engine.java | 44 +++
.../hudi/table/management/entity/Instance.java | 92 ++++++
.../table/management/entity/InstanceStatus.java | 61 ++++
.../exception/HoodieTableManagementException.java | 32 ++
.../management/executor/BaseActionExecutor.java | 102 +++++++
.../management/executor/CompactionExecutor.java | 59 ++++
.../executor/submitter/ExecutionEngine.java | 82 +++++
.../management/executor/submitter/SparkEngine.java | 220 ++++++++++++++
.../table/management/handlers/ActionHandler.java | 71 +++++
.../management/handlers/ClusteringHandler.java | 51 ++++
.../management/handlers/CompactionHandler.java | 66 ++++
.../hudi/table/management/service/BaseService.java | 29 ++
.../table/management/service/CleanService.java | 78 +++++
.../table/management/service/ExecutorService.java | 102 +++++++
.../table/management/service/MonitorService.java | 65 ++++
.../table/management/service/RetryService.java | 81 +++++
.../table/management/service/ScheduleService.java | 116 +++++++
.../hudi/table/management/store/MetadataStore.java | 41 +++
.../store/impl/RelationDBBasedStore.java | 70 +++++
.../store/jdbc/HikariDataSourceFactory.java | 38 +++
.../table/management/store/jdbc/InstanceDao.java | 156 ++++++++++
.../store/jdbc/SqlSessionFactoryUtil.java | 82 +++++
.../hudi/table/management/util/DateTimeUtils.java | 32 ++
.../hudi/table/management/util/InstanceUtil.java | 34 +++
.../src/main/resources/hikariPool.properties | 20 ++
.../src/main/resources/logback.xml | 41 +++
.../src/main/resources/mybatis-config.xml | 42 +++
.../src/main/resources/mybatis/Instance.xml | 165 ++++++++++
.../main/resources/table-management-service.sql | 46 +++
.../test/resources/log4j-surefire-quiet.properties | 29 ++
.../src/test/resources/log4j-surefire.properties | 30 ++
pom.xml | 1 +
43 files changed, 3235 insertions(+)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 15ead5efb0..94ba014c4c 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -21,8 +21,10 @@ package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -39,7 +41,9 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
@@ -102,6 +106,22 @@ public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
+
+ if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.replacecommit.name())) {
+ submitClusteringToService();
+ }
+
return planOption;
}
+
+ private void submitClusteringToService() {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ List<String> instantsToSubmit = metaClient.getActiveTimeline()
+ .filterPendingReplaceTimeline()
+ .getInstants()
+ .map(HoodieInstant::getTimestamp)
+ .collect(Collectors.toList());
+ HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(metaClient, config.getTableManagerConfig());
+ tableManagerClient.submitClustering(instantsToSubmit);
+ }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index bdf478a8f6..df38ba4a19 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -333,6 +333,10 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
+ // do not compact a complete instant.
+ if (table.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
+ return null;
+ }
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
diff --git a/hudi-table-management-service/pom.xml b/hudi-table-management-service/pom.xml
new file mode 100644
index 0000000000..d5251abc4a
--- /dev/null
+++ b/hudi-table-management-service/pom.xml
@@ -0,0 +1,333 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hudi</artifactId>
+ <groupId>org.apache.hudi</groupId>
+ <version>0.12.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hudi-table-management-service</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <mybatis.version>3.4.6</mybatis.version>
+ </properties>
+
+ <dependencies>
+ <!-- Hoodie -->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-cli</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-client-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Spark -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <!-- Fasterxml -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <!-- Httpcomponents -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>fluent-hc</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.javalin</groupId>
+ <artifactId>javalin</artifactId>
+ <version>2.8.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.rocksdb</groupId>
+ <artifactId>rocksdbjni</artifactId>
+ </dependency>
+
+ <!-- Hadoop -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
+ <exclusions>
+ <exclusion>
+ <artifactId>tools</artifactId>
+ <groupId>com.sun</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>tools</artifactId>
+ <groupId>com.sun</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>tools</artifactId>
+ <groupId>com.sun</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-java-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mybatis</groupId>
+ <artifactId>mybatis</artifactId>
+ <version>${mybatis.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.24</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ <version>4.0.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>8.0.23</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.2</version>
+ </dependency>
+
+ <!-- Test -->
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>1.4.200</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-common</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-client-common</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.4</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.apache.hudi.compaction.service.TableManagerServer</mainClass>
+ </transformer>
+ </transformers>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
+ <artifactSet>
+ <excludes>
+ <exclude>
+ org.slf4j:slf4j-log4j12
+ </exclude>
+ </excludes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>META-INF/services/javax.*</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>src/test/resources</directory>
+ </resource>
+ </resources>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java
new file mode 100644
index 0000000000..32d2ebbe74
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management;
+
+import org.apache.hudi.table.management.entity.Action;
+import org.apache.hudi.table.management.entity.Engine;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.entity.InstanceStatus;
+import org.apache.hudi.table.management.handlers.ActionHandler;
+import org.apache.hudi.table.management.store.MetadataStore;
+import org.apache.hudi.table.management.util.InstanceUtil;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.javalin.Context;
+import io.javalin.Handler;
+import io.javalin.Javalin;
+import org.apache.hadoop.conf.Configuration;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Main REST Handler class that handles and delegates calls to timeline relevant handlers.
+ */
+public class RequestHandler {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class);
+
+ private final Javalin app;
+ private final ActionHandler actionHandler;
+
+ public RequestHandler(Javalin app,
+ Configuration conf,
+ MetadataStore metadataStore) throws IOException {
+ this.app = app;
+ this.actionHandler = new ActionHandler(conf, metadataStore);
+ }
+
+ public void register() {
+ registerCommonAPI();
+ registerCompactionAPI();
+ registerClusteringAPI();
+ }
+
+ private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException {
+ boolean prettyPrint = ctx.queryParam("pretty") != null;
+ long beginJsonTs = System.currentTimeMillis();
+ String result =
+ prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
+ long endJsonTs = System.currentTimeMillis();
+ LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs));
+ ctx.result(result);
+ }
+
+ /**
+ * Register Compaction API calls.
+ */
+ private void registerCommonAPI() {
+ app.get(HoodieTableManagerClient.REGISTER, new ViewHandler(ctx -> {
+ }));
+ }
+
+ /**
+ * Register Compaction API calls.
+ */
+ private void registerCompactionAPI() {
+ app.get(HoodieTableManagerClient.SUBMIT_COMPACTION, new ViewHandler(ctx -> {
+ for (String instant : ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow().split(",")) {
+ Instance instance = Instance.builder()
+ .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
+ .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
+ .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
+ .action(Action.COMPACTION.getValue())
+ .instant(instant)
+ .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow()))
+ .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow())
+ .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow())
+ .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow())
+ .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow())
+ .status(InstanceStatus.SCHEDULED.getStatus())
+ .build();
+ InstanceUtil.checkArgument(instance);
+ actionHandler.scheduleCompaction(instance);
+ }
+ }));
+
+ app.get(HoodieTableManagerClient.REMOVE_COMPACTION, new ViewHandler(ctx -> {
+ Instance instance = Instance.builder()
+ .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
+ .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
+ .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
+ .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
+ .status(InstanceStatus.INVALID.getStatus())
+ .isDeleted(true)
+ .build();
+ actionHandler.removeCompaction(instance);
+ }));
+ }
+
+ /**
+ * Register Compaction API calls.
+ */
+ private void registerClusteringAPI() {
+ app.get(HoodieTableManagerClient.SUBMIT_CLUSTERING, new ViewHandler(ctx -> {
+ Instance instance = Instance.builder()
+ .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
+ .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
+ .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
+ .action(Action.CLUSTERING.getValue())
+ .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
+ .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow()))
+ .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow())
+ .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow())
+ .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow())
+ .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow())
+ .status(InstanceStatus.SCHEDULED.getStatus())
+ .build();
+ InstanceUtil.checkArgument(instance);
+ actionHandler.scheduleClustering(instance);
+ }));
+
+ app.get(HoodieTableManagerClient.REMOVE_CLUSTERING, new ViewHandler(ctx -> {
+ Instance instance = Instance.builder()
+ .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
+ .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
+ .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
+ .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
+ .status(InstanceStatus.INVALID.getStatus())
+ .isDeleted(true)
+ .build();
+ actionHandler.removeClustering(instance);
+ }));
+ }
+
+ /**
+ * Used for logging and performing refresh check.
+ */
+ private class ViewHandler implements Handler {
+
+ private final Handler handler;
+
+ ViewHandler(Handler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void handle(@NotNull Context context) throws Exception {
+ boolean success = true;
+ long beginTs = System.currentTimeMillis();
+ boolean synced = false;
+ long refreshCheckTimeTaken = 0;
+ long handleTimeTaken = 0;
+ long finalCheckTimeTaken = 0;
+ try {
+ long handleBeginMs = System.currentTimeMillis();
+ handler.handle(context);
+ long handleEndMs = System.currentTimeMillis();
+ handleTimeTaken = handleEndMs - handleBeginMs;
+ } catch (RuntimeException re) {
+ success = false;
+ LOG.error("Got runtime exception servicing request " + context.queryString(), re);
+ throw re;
+ } finally {
+ long endTs = System.currentTimeMillis();
+ long timeTakenMillis = endTs - beginTs;
+ LOG.info(String.format(
+ "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
+ + "Success=%s, Query=%s, Host=%s, synced=%s",
+ timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success,
+ context.queryString(), context.host(), synced));
+ }
+ }
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java
new file mode 100644
index 0000000000..518b9ccff6
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.common.TableManagementServiceConfig;
+import org.apache.hudi.table.management.service.BaseService;
+import org.apache.hudi.table.management.service.CleanService;
+import org.apache.hudi.table.management.service.ExecutorService;
+import org.apache.hudi.table.management.service.MonitorService;
+import org.apache.hudi.table.management.service.RetryService;
+import org.apache.hudi.table.management.service.ScheduleService;
+import org.apache.hudi.table.management.store.MetadataStore;
+
+import com.beust.jcommander.JCommander;
+import io.javalin.Javalin;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A standalone table management service.
+ */
+public class TableManagementServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableManagementServer.class);
+
+ private int serverPort;
+ private final Configuration conf;
+ private final TableManagementServiceConfig config;
+ private final transient FileSystem fs;
+ private transient Javalin app = null;
+ private List<BaseService> services;
+ private MetadataStore metadataStore;
+
+ public TableManagementServer(int serverPort, Configuration conf, TableManagementServiceConfig config)
+ throws IOException {
+ this.config = config;
+ this.conf = FSUtils.prepareHadoopConf(conf);
+ this.fs = FileSystem.get(conf);
+ this.serverPort = serverPort;
+ this.metadataStore = initMetadataStore();
+ }
+
+ public TableManagementServer(TableManagementServiceConfig config) throws IOException {
+ this(config.serverPort, new Configuration(), config);
+ }
+
+ public int startService() throws IOException {
+ app = Javalin.create();
+ RequestHandler requestHandler = new RequestHandler(app, conf, metadataStore);
+ app.get("/", ctx -> ctx.result("Hello World"));
+ requestHandler.register();
+ app.start(serverPort);
+ registerService();
+ initAndStartRegisterService();
+ return serverPort;
+ }
+
+ private MetadataStore initMetadataStore() {
+ String className = ServiceConfig.getInstance()
+ .getString(ServiceConfig.ServiceConfVars.MetadataStoreClass);
+ MetadataStore metadataStore = ReflectionUtils.loadClass(className);
+ metadataStore.init();
+ LOG.info("Finish init metastore: " + className);
+ return metadataStore;
+ }
+
+ private void registerService() {
+ services = new ArrayList<>();
+ ExecutorService executorService = new ExecutorService();
+ services.add(executorService);
+ services.add(new ScheduleService(executorService, metadataStore));
+ services.add(new RetryService(metadataStore));
+ services.add(new MonitorService());
+ services.add(new CleanService());
+ }
+
+ private void initAndStartRegisterService() {
+ for (BaseService service : services) {
+ service.init();
+ service.startService();
+ }
+ }
+
+ private void stopRegisterService() {
+ for (BaseService service : services) {
+ service.stop();
+ }
+ }
+
+ public void run() throws IOException {
+ startService();
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ // Use stderr here since the logger may have been reset by its JVM shutdown hook.
+ System.err.println(
+ "*** shutting down Table management service since JVM is shutting down");
+ try {
+ TableManagementServer.this.stop();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ System.err.println("*** Table management service shut down");
+ }));
+ }
+
+ /**
+ * Stop serving requests and shutdown resources.
+ */
+ public void stop() throws InterruptedException {
+ LOG.info("Stopping Table management Service");
+ this.app.stop();
+ this.app = null;
+ stopRegisterService();
+ LOG.info("Stopped Table management Service");
+ }
+
+ public static void main(String[] args) throws Exception {
+ final TableManagementServiceConfig cfg = new TableManagementServiceConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help) {
+ cmd.usage();
+ System.exit(1);
+ }
+ TableManagementServer service = new TableManagementServer(cfg);
+ service.run();
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java
new file mode 100644
index 0000000000..c33e88b103
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.common;
+
+public class EnvConstant {
+ public static final String JAVA_HOME = "JAVA_HOME";
+ public static final String YARN_CONF_DIR = "YARN_CONF_DIR";
+
+ public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
+ public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
+
+ public static final String SPARK_HOME = "SPARK_HOME";
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java
new file mode 100644
index 0000000000..d60cf05bee
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class ServiceConfig extends Properties {
+
+ private static Logger LOG = LoggerFactory.getLogger(ServiceConfig.class);
+ private static final String HOODIE_ENV_PROPS_PREFIX = "hoodie_";
+
+ private static ServiceConfig CONFIG = new ServiceConfig();
+
+ /**
+ * Constructor.
+ */
+ private ServiceConfig() {
+ LOG.info("Start init ServiceConfig");
+ Map<String, String> envs = System.getenv();
+ for (Map.Entry<String, String> env : envs.entrySet()) {
+ if (env.getKey().toLowerCase().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
+ String key = env.getKey().toLowerCase().replace("_", ".table.management.");
+ String value = env.getValue().trim();
+ setProperty(key, value);
+ LOG.info("Set property " + key + " to " + value);
+ }
+ }
+ LOG.info("Finish init ServiceConfig");
+ }
+
+ public String getString(ServiceConfVars confVars) {
+ return this.getProperty(confVars.key(), confVars.defVal());
+ }
+
+ public void setString(ServiceConfVars confVars, String value) {
+ this.setProperty(confVars.key(), value);
+ }
+
+ public Boolean getBool(ServiceConfVars confVars) {
+ return Boolean.valueOf(this.getProperty(confVars.key(), confVars.defVal()));
+ }
+
+ public int getInt(ServiceConfVars confVars) {
+ return Integer.parseInt(this.getProperty(confVars.key(), confVars.defVal()));
+ }
+
+ public static ServiceConfig getInstance() {
+ return CONFIG;
+ }
+
+ public enum ServiceConfVars {
+ JavaHome("hoodie.table.management.java.home", ""),
+ SparkHome("hoodie.table.management.spark.home", ""),
+ YarnConfDir("hoodie.table.management.yarn.conf.dir", ""),
+ HadoopConfDir("hoodie.table.management.hadoop.conf.dir", ""),
+ CompactionMainClass("hoodie.table.management.compaction.main.class", "org.apache.hudi.utilities.HoodieCompactor"),
+ CompactionScheduleWaitInterval("hoodie.table.management.schedule.wait.interval", "30000"),
+ IntraMaxFailTolerance("hoodie.table.management.max.fail.tolerance", "5"),
+ MaxRetryNum("hoodie.table.management.instance.max.retry", "3"),
+ MetadataStoreClass("hoodie.table.management.metadata.store.class",
+ "org.apache.hudi.table.management.store.impl.RelationDBBasedStore"),
+ CompactionCacheEnable("hoodie.table.management.compaction.cache.enable", "true"),
+ RetryTimes("hoodie.table.management.retry.times", "5"),
+ SparkSubmitJarPath("hoodie.table.management.submit.jar.path", "/tmp/hoodie_submit_jar/spark/"),
+ SparkShuffleHdfsEnabled("hoodie.table.management.spark.shuffle.hdfs.enabled", "true"),
+ SparkParallelism("hoodie.table.management.spark.parallelism", "1"),
+ SparkMaster("hoodie.table.management.spark.parallelism", "local[1]"),
+ SparkVcoreBoost("hoodie.table.management.spark.vcore.boost", "1"),
+ SparkVcoreBoostRatio("hoodie.table.management.spark.vcore.boost.ratio", "1"),
+ SparkSpeculation("hoodie.table.management.spark.speculation", "false"),
+ ExecutorMemory("hoodie.table.management.executor.memory", "20g"),
+ DriverMemory("hoodie.table.management.driver.memory", "20g"),
+ ExecutorMemoryOverhead("hoodie.table.management.executor.memory.overhead", "5g"),
+ ExecutorCores("hoodie.table.management.executor.cores", "1"),
+ MinExecutors("hoodie.table.management.min.executors", "5"),
+ MaxExecutors("hoodie.table.management.max.executors", "1000"),
+ CoreExecuteSize("hoodie.table.management.core.executor.pool.size", "300"),
+ MaxExecuteSize("hoodie.table.management.max.executor.pool.size", "1000");
+
+ private final String key;
+ private final String defaultVal;
+
+ ServiceConfVars(String key, String defaultVal) {
+ this.key = key;
+ this.defaultVal = defaultVal;
+ }
+
+ public String key() {
+ return this.key;
+ }
+
+ public String defVal() {
+ return this.defaultVal;
+ }
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java
new file mode 100644
index 0000000000..cd5240da36
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.common;
+
+import org.apache.hudi.table.management.store.jdbc.InstanceDao;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ServiceContext {
+
+ private static ConcurrentHashMap<String, String> runningInstance = new ConcurrentHashMap<>();
+
+ public static void addRunningInstance(String instanceIdentifier, String threadIdentifier) {
+ runningInstance.put(instanceIdentifier, threadIdentifier);
+ }
+
+ public static void removeRunningInstance(String instanceIdentifier) {
+ runningInstance.remove(instanceIdentifier);
+ }
+
+ public static int getRunningInstanceNum() {
+ return runningInstance.size();
+ }
+
+ public static List<String> getRunningInstanceInfo() {
+ List<String> runningInfos = new ArrayList<>();
+ for (Map.Entry<String, String> instance : runningInstance.entrySet()) {
+ runningInfos.add("instance " + instance.getKey() + " execution on " + instance.getValue());
+ }
+ return runningInfos;
+ }
+
+ private static ConcurrentHashMap<String, Long> pendingInstances = new ConcurrentHashMap<>();
+
+ public static boolean containsPendingInstant(String key) {
+ return pendingInstances.containsKey(key);
+ }
+
+ public static void refreshPendingInstant(String key) {
+ pendingInstances.put(key, System.currentTimeMillis());
+ }
+
+ public static void removePendingInstant(String key) {
+ pendingInstances.remove(key);
+ }
+
+ public static ConcurrentHashMap<String, Long> getPendingInstances() {
+ return pendingInstances;
+ }
+
+ public static InstanceDao getInstanceDao() {
+ return ServiceContextHolder.INSTANCE_DAO;
+ }
+
+ private static class ServiceContextHolder {
+ private static final InstanceDao INSTANCE_DAO = new InstanceDao();
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java
new file mode 100644
index 0000000000..c756631f05
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.common;
+
+import com.beust.jcommander.Parameter;
+
+public class TableManagementServiceConfig {
+ @Parameter(names = {"--server-port", "-p"}, description = " Server Port")
+ public Integer serverPort = 26755;
+
+ @Parameter(names = {"--help", "-h"})
+ public Boolean help = false;
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java
new file mode 100644
index 0000000000..e84f75666a
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.entity;
+
+public enum Action {
+ COMPACTION(0),
+ CLUSTERING(1);
+
+ private final int value;
+
+ Action(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+
+ public static void checkActionType(Instance instance) {
+ for (Action action : Action.values()) {
+ if (action.getValue() == instance.getAction()) {
+ return;
+ }
+ }
+ throw new RuntimeException("Invalid action type: " + instance);
+ }
+
+ public static Action getAction(int actionValue) {
+ for (Action action : Action.values()) {
+ if (action.getValue() == actionValue) {
+ return action;
+ }
+ }
+ throw new RuntimeException("Invalid instance action: " + actionValue);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java
new file mode 100644
index 0000000000..d7e8b9fe93
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.entity;
+
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.util.DateTimeUtils;
+
+import lombok.Getter;
+
+import java.util.Date;
+
+@Getter
+public class AssistQueryEntity {
+
+ private int maxRetry = ServiceConfig.getInstance()
+ .getInt(ServiceConfig.ServiceConfVars.MaxRetryNum);
+
+ private Date queryStartTime = DateTimeUtils.addDay(-3);
+
+ private int status;
+
+ public AssistQueryEntity() {
+
+ }
+
+ public AssistQueryEntity(int status, Date queryStartTime) {
+ this.status = status;
+ this.queryStartTime = queryStartTime;
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java
new file mode 100644
index 0000000000..edcb45c0ae
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.entity;
+
+public enum Engine {
+
+ SPARK(0),
+ FLINK(1);
+
+ private final int value;
+
+ Engine(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return this.value;
+ }
+
+ public static void checkEngineType(Instance instance) {
+ for (Engine engine : Engine.values()) {
+ if (engine.equals(instance.getExecutionEngine())) {
+ return;
+ }
+ }
+ throw new RuntimeException("Invalid engine type: " + instance);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java
new file mode 100644
index 0000000000..91cfeb6ce5
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.Date;
+
+@Builder
+@Getter
+@Setter
+@ToString
+@NoArgsConstructor
+@AllArgsConstructor
+public class Instance {
+
+ private long id;
+
+ private String dbName;
+
+ private String tableName;
+
+ private String basePath;
+
+ private Engine executionEngine;
+
+ private String owner;
+
+ private String queue;
+
+ private String resource;
+
+ private String parallelism;
+
+ private String instant;
+
+ private int action;
+
+ private int status;
+
+ private int runTimes;
+
+ private String applicationId;
+
+ private String doradoJobId;
+
+ private Date scheduleTime;
+
+ private Date createTime;
+
+ private Date updateTime;
+
+ private boolean isDeleted;
+
+ public String getFullTableName() {
+ return dbName + "." + tableName;
+ }
+
+ public String getIdentifier() {
+ return dbName + "." + tableName + "." + instant + "." + status;
+ }
+
+ public String getInstanceRunStatus() {
+ return dbName + "." + tableName + "." + instant + "." + status + "."
+ + runTimes + "." + updateTime;
+ }
+
+ public String getRecordKey() {
+ return dbName + "." + tableName + "." + instant;
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java
new file mode 100644
index 0000000000..4577cd236b
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.entity;
+
+public enum InstanceStatus {
+
+ SCHEDULED(0, "scheduled"),
+ RUNNING(1, "running"),
+ FAILED(2, "failed"),
+ INVALID(3, "invalid"),
+ COMPLETED(4, "completed");
+
+ private int status;
+ private String desc;
+
+ InstanceStatus(int status, String desc) {
+ this.status = status;
+ this.desc = desc;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ }
+
+ public static InstanceStatus getInstance(int status) {
+ for (InstanceStatus instanceStatus : InstanceStatus.values()) {
+ if (instanceStatus.getStatus() == status) {
+ return instanceStatus;
+ }
+ }
+ throw new RuntimeException("Invalid instance status: " + status);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java
new file mode 100644
index 0000000000..84db007854
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.exception;
+
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieTableManagementException extends HoodieException {
+
+ public HoodieTableManagementException(String msg) {
+ super(msg);
+ }
+
+ public HoodieTableManagementException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java
new file mode 100644
index 0000000000..e3ac837492
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.executor;
+
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.common.ServiceContext;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.entity.InstanceStatus;
+import org.apache.hudi.table.management.executor.submitter.ExecutionEngine;
+import org.apache.hudi.table.management.executor.submitter.SparkEngine;
+import org.apache.hudi.table.management.store.jdbc.InstanceDao;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseActionExecutor implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseActionExecutor.class);
+
+ protected InstanceDao instanceDao;
+ protected Instance instance;
+ public int maxFailTolerance;
+ protected ExecutionEngine engine;
+
+ public BaseActionExecutor(Instance instance) {
+ this.instance = instance;
+ this.instanceDao = ServiceContext.getInstanceDao();
+ this.maxFailTolerance = ServiceConfig.getInstance()
+ .getInt(ServiceConfig.ServiceConfVars.IntraMaxFailTolerance);
+ String mainClass = ServiceConfig.getInstance()
+ .getString(ServiceConfig.ServiceConfVars.CompactionMainClass);
+ switch (instance.getExecutionEngine()) {
+ case SPARK:
+ engine = new SparkEngine(getJobName(instance), instance, mainClass);
+ break;
+ case FLINK:
+ default:
+ throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine());
+ }
+ }
+
+ @Override
+ public void run() {
+ ServiceContext.addRunningInstance(instance.getRecordKey(), getThreadIdentifier());
+ try {
+ execute();
+ } finally {
+ ServiceContext.removeRunningInstance(instance.getRecordKey());
+ if (ServiceConfig.getInstance()
+ .getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable)) {
+ ServiceContext.removePendingInstant(instance.getRecordKey());
+ }
+ }
+ }
+
+ public abstract boolean doExecute();
+
+ public abstract String getJobName(Instance instance);
+
+ public void execute() {
+ try {
+ boolean success = doExecute();
+ if (success) {
+ instance.setStatus(InstanceStatus.COMPLETED.getStatus());
+ LOG.info("Success exec instance: " + instance.getIdentifier());
+ } else {
+ instance.setStatus(InstanceStatus.FAILED.getStatus());
+ LOG.info("Fail exec instance: " + instance.getIdentifier());
+ }
+ } catch (Exception e) {
+ instance.setStatus(InstanceStatus.FAILED.getStatus());
+ LOG.error("Fail exec instance: " + instance.getIdentifier() + ", errMsg: ", e);
+ }
+ instanceDao.updateStatus(instance);
+ }
+
+ public String getThreadIdentifier() {
+ return Thread.currentThread().getId() + "." + Thread.currentThread().getName() + "."
+ + Thread.currentThread().getState();
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getName() + ", instance: " + instance.getIdentifier();
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java
new file mode 100644
index 0000000000..9e21663651
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.executor;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.entity.InstanceStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompactionExecutor extends BaseActionExecutor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactionExecutor.class);
+
+ public static final String COMPACT_JOB_NAME = "Hoodie compact %s.%s %s";
+
+ public CompactionExecutor(Instance instance) {
+ super(instance);
+ }
+
+ @Override
+ public boolean doExecute() {
+ String jobName = getJobName(instance);
+ LOG.info("Start exec : " + jobName);
+ instance.setStatus(InstanceStatus.RUNNING.getStatus());
+ instanceDao.saveInstance(instance);
+ String applicationId = engine.execute(jobName, instance);
+ if (StringUtils.isNullOrEmpty(applicationId)) {
+ LOG.warn("Failed to run compaction for " + jobName);
+ return false;
+ }
+
+ LOG.info("Compaction successfully completed for " + jobName);
+ return true;
+ }
+
+ @Override
+ public String getJobName(Instance instance) {
+ return String.format(COMPACT_JOB_NAME, instance.getDbName(), instance.getTableName(),
+ instance.getInstant());
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java
new file mode 100644
index 0000000000..c4fdb98faf
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.executor.submitter;
+
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.exception.HoodieTableManagementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public abstract class ExecutionEngine {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutionEngine.class);
+
+ protected static final String YARN_SUBMITTED = "Submitted application";
+
+ public String execute(String jobName, Instance instance) throws HoodieTableManagementException {
+ try {
+ LOG.info("Submitting instance {}:{}", jobName, instance.getIdentifier());
+ beforeExecuteCommand();
+ return executeCommand(jobName, instance);
+ } catch (Exception e) {
+ throw new HoodieTableManagementException("Failed submit instance " + instance, e);
+ }
+ }
+
+ protected String executeCommand(String jobName, Instance instance) {
+ String command = "";
+ try {
+ command = getCommand();
+ LOG.info("Execute command: {}", command);
+ Map<String, String> env = setProcessEnv();
+ LOG.info("Execute env: {}", env);
+
+ return "-1";
+
+// ExecuteHelper executeHelper = new ExecuteHelper(command, jobName, env);
+// CompletableFuture<Void> executeFuture = executeHelper.getExecuteThread();
+// executeFuture.whenComplete((Void ignored, Throwable throwable) -> executeHelper.closeProcess());
+// while (!executeFuture.isDone()) {
+// LOG.info("Waiting for execute job " + jobName);
+// TimeUnit.SECONDS.sleep(5);
+// }
+// if (executeHelper.isSuccess) {
+// LOG.info("Execute job {} command success", jobName);
+// } else {
+// LOG.info("Execute job {} command failed", jobName);
+// }
+// return executeHelper.applicationId;
+ } catch (Exception e) {
+ LOG.error("Execute command error with exception: ", e);
+ throw new HoodieTableManagementException("Execute " + command + " command error", e);
+ }
+ }
+
+ protected abstract String getCommand() throws IOException;
+
+ protected abstract void beforeExecuteCommand();
+
+ public abstract Map<String, String> setProcessEnv();
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java
new file mode 100644
index 0000000000..99babcd2f5
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.executor.submitter;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.table.management.common.EnvConstant;
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.exception.HoodieTableManagementException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.cli.utils.SparkUtil.initLauncher;
+
+public class SparkEngine extends ExecutionEngine {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkEngine.class);
+
+ private String jobName;
+ private Instance instance;
+ private String mainClass;
+
+ public SparkEngine(String jobName, Instance instance, String mainClass) {
+ this.jobName = jobName;
+ this.instance = instance;
+ this.mainClass = mainClass;
+ }
+
+ @Override
+ protected String getCommand() throws IOException {
+ String format = "%s/bin/spark-submit --class %s --master yarn --deploy-mode cluster %s %s %s";
+ return String.format(format, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome),
+ mainClass, getSparkArgs(), getSubmitJar(), getJobArgs());
+ }
+
+ @Override
+ protected void beforeExecuteCommand() {
+
+ }
+
+ @Override
+ public Map<String, String> setProcessEnv() {
+ Map<String, String> env = new HashMap<>(16);
+ env.put(EnvConstant.JAVA_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.JavaHome));
+ env.put(EnvConstant.YARN_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.YarnConfDir));
+ env.put(EnvConstant.SPARK_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome));
+ env.put(EnvConstant.HADOOP_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.HadoopConfDir));
+ env.put(EnvConstant.HADOOP_USER_NAME, instance.getOwner());
+ return env;
+ }
+
+ private String getJobArgs() throws IOException {
+ return null;
+ }
+
+ private String getSubmitJar() throws IOException {
+ File sparkSubmitJarPath = new File(ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSubmitJarPath));
+ if (!sparkSubmitJarPath.isDirectory()) {
+ throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath + " should be be a directory");
+ }
+ File[] jars = sparkSubmitJarPath.listFiles(file -> !file.getName().endsWith(".jar"));
+ if (jars == null || jars.length != 1) {
+ throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath
+ + " should only have one jar, jars = " + Arrays.toString(jars));
+ }
+ return jars[0].getCanonicalPath();
+ }
+
+ private String getSparkArgs() {
+ StringBuilder sparkArgs = new StringBuilder();
+ sparkArgs.append("--queue ").append(instance.getQueue());
+ sparkArgs.append(" --name ").append(jobName);
+
+ Map<String, String> sparkParams = new HashMap<>();
+ sparkParams.put("mapreduce.job.queuename", instance.getQueue());
+ sparkParams.put("spark.shuffle.hdfs.enabled", ServiceConfig.getInstance()
+ .getString(ServiceConfig.ServiceConfVars.SparkShuffleHdfsEnabled));
+ String parallelism = StringUtils.isNullOrEmpty(instance.getParallelism())
+ ? ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MaxExecutors)
+ : instance.getParallelism();
+ sparkParams.put("spark.dynamicAllocation.maxExecutors", parallelism);
+ sparkParams.put("spark.dynamicAllocation.minExecutors",
+ ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MinExecutors));
+ sparkParams.put("spark.vcore.boost",
+ ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoost));
+ sparkParams.put("spark.vcore.boost.ratio",
+ ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoostRatio));
+ sparkParams.put("spark.speculation",
+ ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSpeculation));
+ String driverResource;
+ String executorResource;
+ String resource = instance.getResource().trim();
+ if (StringUtils.isNullOrEmpty(resource)) {
+ driverResource = ServiceConfig.getInstance()
+ .getString(ServiceConfig.ServiceConfVars.DriverMemory);
+ executorResource = ServiceConfig.getInstance()
+ .getString(ServiceConfig.ServiceConfVars.ExecutorMemory);
+ } else {
+ String[] resourceArray = resource.split(":");
+ if (resourceArray.length == 1) {
+ driverResource = resourceArray[0];
+ executorResource = resourceArray[0];
+ } else if (resourceArray.length == 2) {
+ driverResource = resourceArray[0];
+ executorResource = resourceArray[1];
+ } else {
+ throw new RuntimeException(
+ "Invalid conf: " + instance.getIdentifier() + ", resource: " + resource);
+ }
+ }
+ sparkParams.put("spark.executor.cores",
+ ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorCores));
+ sparkParams.put("spark.executor.memory", executorResource);
+ sparkParams.put("spark.driver.memory", driverResource);
+ sparkParams.put("spark.executor.memoryOverhead", ServiceConfig.getInstance()
+ .getString(ServiceConfig.ServiceConfVars.ExecutorMemoryOverhead));
+
+ for (Map.Entry<String, String> entry : sparkParams.entrySet()) {
+ sparkArgs
+ .append(" --conf ")
+ .append(entry.getKey())
+ .append("=")
+ .append(entry.getValue());
+ }
+
+ return sparkArgs.toString();
+ }
+
+ @Override
+ public String executeCommand(String jobName, Instance instance) throws HoodieTableManagementException {
+ String sparkPropertiesPath =
+ Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
+ SparkLauncher sparkLauncher;
+ try {
+ sparkLauncher = initLauncher(sparkPropertiesPath);
+ } catch (URISyntaxException e) {
+ LOG.error("Failed to init spark launcher");
+ throw new HoodieTableManagementException("Failed to init spark launcher", e);
+ }
+
+ String master = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkMaster);
+ String sparkMemory = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorMemory);
+ String parallelism = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkParallelism);
+ String retry = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.RetryTimes);
+
+ sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(),
+ instance.getTableName(), instance.getInstant(), parallelism, "", retry, "");
+
+ Process process;
+ try {
+ process = sparkLauncher.launch();
+ } catch (IOException e) {
+ LOG.error("Failed to launcher spark process");
+ throw new HoodieTableManagementException("Failed to init spark launcher", e);
+ }
+
+ InputStream inputStream = null;
+ BufferedReader bufferedReader = null;
+ String applicationId = null;
+ try {
+ inputStream = process.getInputStream();
+ bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ LOG.info(line);
+ if (line.contains(YARN_SUBMITTED)) {
+ String[] split = line.split(YARN_SUBMITTED);
+ applicationId = split[1].trim();
+ LOG.info("Execute job {} get application id {}", jobName, applicationId);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("execute {} process get application id error", jobName, e);
+ throw new HoodieTableManagementException("execute " + jobName + " process get application id error", e);
+ } finally {
+ if (process != null) {
+ process.destroyForcibly();
+ }
+ if (inputStream != null) {
+ IOUtils.closeQuietly(inputStream);
+ }
+ if (bufferedReader != null) {
+ IOUtils.closeQuietly(bufferedReader);
+ }
+ }
+
+ return applicationId;
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java
new file mode 100644
index 0000000000..62e8ca45dc
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.handlers;
+
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.store.MetadataStore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ActionHandler implements AutoCloseable {
+ private static Logger LOG = LoggerFactory.getLogger(ActionHandler.class);
+
+ protected final Configuration conf;
+ protected final FileSystem fileSystem;
+ protected final MetadataStore metadataStore;
+
+ private final CompactionHandler compactionHandler;
+
+ public ActionHandler(Configuration conf,
+ MetadataStore metadataStore) throws IOException {
+ this.conf = conf;
+ this.fileSystem = FileSystem.get(conf);
+ this.metadataStore = metadataStore;
+ boolean cacheEnable = ServiceConfig.getInstance().getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable);
+ this.compactionHandler = new CompactionHandler(cacheEnable);
+ }
+
+ public void scheduleCompaction(Instance instance) {
+ compactionHandler.scheduleCompaction(metadataStore, instance);
+ }
+
+ public void removeCompaction(Instance instance) throws IOException {
+ compactionHandler.removeCompaction(metadataStore, instance);
+ }
+
+ // TODO: support clustering
+ public void scheduleClustering(Instance instance) {
+
+ }
+
+ public void removeClustering(Instance instance) {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.fileSystem.close();
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java
new file mode 100644
index 0000000000..1afc973902
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.handlers;
+
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.store.MetadataStore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST Handler servicing clustering requests.
+ */
+public class ClusteringHandler {
+
+ private static Logger LOG = LoggerFactory.getLogger(ClusteringHandler.class);
+
+ public void scheduleClustering(MetadataStore metadataStore,
+ Instance instance) {
+ LOG.info("Start register compaction instance: " + instance.getIdentifier());
+ metadataStore.saveInstance(instance);
+ }
+
+ public void removeClustering(MetadataStore metadataStore,
+ Instance instance) {
+ LOG.info("Start remove clustering instance: " + instance.getIdentifier());
+ // 1. check instance exist
+ Instance result = metadataStore.getInstance(instance);
+ if (result == null) {
+ throw new RuntimeException("Instance not exist: " + instance);
+ }
+ // 2. update status
+ metadataStore.updateStatus(instance);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java
new file mode 100644
index 0000000000..4edf962617
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.handlers;
+
+import org.apache.hudi.table.management.common.ServiceContext;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.store.MetadataStore;
+
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * REST Handler servicing compaction requests.
+ */
+public class CompactionHandler {
+ private static Logger LOG = LoggerFactory.getLogger(CompactionHandler.class);
+ protected boolean cacheEnable;
+
+ public CompactionHandler(boolean cacheEnable) {
+ this.cacheEnable = cacheEnable;
+ }
+
+ public void scheduleCompaction(MetadataStore metadataStore,
+ Instance instance) {
+ String recordKey = instance.getRecordKey();
+ LOG.info("Start register compaction instance: " + recordKey);
+ if ((cacheEnable && ServiceContext.containsPendingInstant(recordKey))
+ || metadataStore.getInstance(instance) != null) {
+ LOG.warn("Instance has existed, instance: " + instance);
+ } else {
+ metadataStore.saveInstance(instance);
+ }
+ if (cacheEnable) {
+ ServiceContext.refreshPendingInstant(recordKey);
+ }
+ }
+
+ public void removeCompaction(@NotNull MetadataStore metadataStore,
+ Instance instance) {
+ LOG.info("Start remove compaction instance: " + instance.getIdentifier());
+ // 1. check instance exist
+ Instance result = metadataStore.getInstance(instance);
+ if (result == null) {
+ throw new RuntimeException("Instance not exist: " + instance);
+ }
+ // 2. update status
+ metadataStore.updateStatus(instance);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java
new file mode 100644
index 0000000000..5855535e36
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.service;
+
+public interface BaseService {
+
+ void init();
+
+ void startService();
+
+ void stop();
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java
new file mode 100644
index 0000000000..b792dfee75
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.service;
+
+import org.apache.hudi.table.management.common.ServiceContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class CleanService implements BaseService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CleanService.class);
+ private ScheduledExecutorService service;
+ private long cacheInterval = 3600 * 1000; //ms
+
+ @Override
+ public void init() {
+ LOG.info("Init service: " + CleanService.class.getName());
+ //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Clean-Service-%d").build();
+ this.service = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public void startService() {
+ LOG.info("Start service: " + CleanService.class.getName());
+ service.scheduleAtFixedRate(new RetryRunnable(), 30, 300, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stop service: " + CleanService.class.getName());
+ if (service != null && !service.isShutdown()) {
+ service.shutdown();
+ }
+ }
+
+ private class RetryRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ cleanCache();
+ }
+ }
+
+ private void cleanCache() {
+ long currentTime = System.currentTimeMillis();
+ ConcurrentHashMap<String, Long> pendingInstances = ServiceContext.getPendingInstances();
+ for (Map.Entry<String, Long> instance : pendingInstances.entrySet()) {
+ if (currentTime - instance.getValue() > cacheInterval) {
+ LOG.info("Instance has expired: " + instance.getKey());
+ pendingInstances.remove(instance.getKey());
+ }
+ }
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java
new file mode 100644
index 0000000000..919199a51f
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.service;
+
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.common.ServiceContext;
+import org.apache.hudi.table.management.executor.BaseActionExecutor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class ExecutorService implements BaseService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
+
+ private ThreadPoolExecutor executorService;
+ private ScheduledExecutorService service;
+ private BlockingQueue<BaseActionExecutor> taskQueue;
+ private int coreExecuteSize;
+ private int maxExecuteSize;
+
+ public void init() {
+ service = Executors.newSingleThreadScheduledExecutor();
+ coreExecuteSize = ServiceConfig.getInstance()
+ .getInt(ServiceConfig.ServiceConfVars.CoreExecuteSize);
+ maxExecuteSize = ServiceConfig.getInstance()
+ .getInt(ServiceConfig.ServiceConfVars.MaxExecuteSize);
+ //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Executor-Service-%d").build();
+ executorService = new ThreadPoolExecutor(coreExecuteSize, maxExecuteSize, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<>());
+ taskQueue = new LinkedBlockingQueue<>();
+ LOG.info("Init service: " + ExecutorService.class.getName() + ", coreExecuteSize: "
+ + coreExecuteSize + ", maxExecuteSize: " + maxExecuteSize);
+ }
+
+ @Override
+ public void startService() {
+ LOG.info("Start service: " + ExecutorService.class.getName());
+ service.submit(new ExecutionTask());
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stop service: " + ExecutorService.class.getName());
+ if (executorService != null && !executorService.isShutdown()) {
+ executorService.shutdown();
+ }
+ if (service != null && service.isShutdown()) {
+ service.shutdown();
+ }
+ LOG.info("Finish stop service: " + ExecutorService.class.getName());
+ }
+
+ private class ExecutionTask implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ BaseActionExecutor executor = taskQueue.take();
+ LOG.info("Start execute: " + executor);
+ executorService.execute(executor);
+ } catch (InterruptedException interruptedException) {
+ LOG.error("Occur exception when exec job: " + interruptedException);
+ }
+ }
+ }
+ }
+
+ public void submitTask(BaseActionExecutor task) {
+ taskQueue.add(task);
+ }
+
+ public int getFreeSize() {
+ return maxExecuteSize - ServiceContext.getRunningInstanceNum();
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java
new file mode 100644
index 0000000000..164549a38b
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.service;
+
+import org.apache.hudi.table.management.common.ServiceContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MonitorService implements BaseService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MonitorService.class);
+
+ private ScheduledExecutorService service;
+
+ @Override
+ public void init() {
+ LOG.info("Init service: " + MonitorService.class);
+ this.service = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public void startService() {
+ LOG.info("Start service: " + MonitorService.class.getName());
+ service.scheduleAtFixedRate(new MonitorRunnable(), 30, 180, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stop service: " + MonitorService.class.getName());
+ if (service != null && !service.isShutdown()) {
+ service.shutdown();
+ }
+ }
+
+ private class MonitorRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ for (String info : ServiceContext.getRunningInstanceInfo()) {
+ LOG.info(info);
+ }
+ }
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java
new file mode 100644
index 0000000000..17bc3bb5df
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.service;
+
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.entity.InstanceStatus;
+import org.apache.hudi.table.management.store.MetadataStore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class RetryService implements BaseService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryService.class);
+
+ private MetadataStore metadataStore;
+ private ScheduledExecutorService service;
+
+ public RetryService(MetadataStore metadataStore) {
+ this.metadataStore = metadataStore;
+ }
+
+ @Override
+ public void init() {
+ LOG.info("Init service: " + RetryService.class.getName());
+ //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Retry-Service-%d").build();
+ this.service = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public void startService() {
+ LOG.info("Start service: " + RetryService.class.getName());
+ service.scheduleAtFixedRate(new RetryRunnable(), 30, 180, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stop service: " + RetryService.class.getName());
+ if (service != null && !service.isShutdown()) {
+ service.shutdown();
+ }
+ }
+
+ private class RetryRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ submitFailTask();
+ }
+ }
+
+ public void submitFailTask() {
+ List<Instance> failInstances = metadataStore.getRetryInstances();
+ for (Instance instance : failInstances) {
+ LOG.info("Start retry instance: " + instance.getIdentifier());
+ instance.setStatus(InstanceStatus.SCHEDULED.getStatus());
+ metadataStore.updateStatus(instance);
+ }
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java
new file mode 100644
index 0000000000..a7e1d708b5
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.service;
+
+import org.apache.hudi.table.management.common.ServiceConfig;
+import org.apache.hudi.table.management.entity.Action;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.entity.InstanceStatus;
+import org.apache.hudi.table.management.exception.HoodieTableManagementException;
+import org.apache.hudi.table.management.executor.BaseActionExecutor;
+import org.apache.hudi.table.management.executor.CompactionExecutor;
+import org.apache.hudi.table.management.store.MetadataStore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ScheduleService implements BaseService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScheduleService.class);
+
+ private ScheduledExecutorService service;
+ private ExecutorService executionService;
+ private MetadataStore metadataStore;
+ private int compactionWaitInterval;
+
+ public ScheduleService(ExecutorService executionService,
+ MetadataStore metadataStore) {
+ this.executionService = executionService;
+ this.metadataStore = metadataStore;
+ this.compactionWaitInterval = ServiceConfig.getInstance()
+ .getInt(ServiceConfig.ServiceConfVars.CompactionScheduleWaitInterval);
+ }
+
+ @Override
+ public void init() {
+ LOG.info("Finish init schedule service, compactionWaitInterval: " + compactionWaitInterval);
+ //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Schedule-Service-%d").build();
+ this.service = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @Override
+ public void startService() {
+ LOG.info("Start service: " + ScheduleService.class.getName());
+ service.scheduleAtFixedRate(new ScheduleRunnable(), 30, 60, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stop service: " + ScheduleService.class.getName());
+ if (service != null && !service.isShutdown()) {
+ service.shutdown();
+ }
+ }
+
+ private class ScheduleRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ submitReadyTask();
+ }
+ }
+
+ public void submitReadyTask() {
+ int limitSize = executionService.getFreeSize();
+ LOG.info("Start get ready instances, limitSize: " + limitSize);
+ if (limitSize > 0) {
+ List<Instance> readyInstances = metadataStore.getInstances(
+ InstanceStatus.SCHEDULED.getStatus(), limitSize);
+ for (Instance readyInstance : readyInstances) {
+ if (waitSchedule(readyInstance)) {
+ LOG.info("Instance should wait schedule: " + readyInstance.getInstanceRunStatus());
+ continue;
+ }
+ LOG.info("Schedule ready instances: " + readyInstance.getInstanceRunStatus());
+ BaseActionExecutor executor = getActionExecutor(readyInstance);
+ executionService.submitTask(executor);
+ }
+ }
+ }
+
+ private boolean waitSchedule(Instance instance) {
+ return instance.getAction() == Action.COMPACTION.getValue()
+ && instance.getUpdateTime().getTime() + compactionWaitInterval
+ > System.currentTimeMillis();
+ }
+
+ protected BaseActionExecutor getActionExecutor(Instance instance) {
+ if (instance.getAction() == Action.COMPACTION.getValue()) {
+ return new CompactionExecutor(instance);
+ } else {
+ throw new HoodieTableManagementException("Unsupported action " + instance.getAction());
+ }
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java
new file mode 100644
index 0000000000..8d730212cc
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.store;
+
+import org.apache.hudi.table.management.entity.AssistQueryEntity;
+import org.apache.hudi.table.management.entity.Instance;
+
+import java.util.List;
+
+public interface MetadataStore {
+
+ void saveInstance(Instance instance);
+
+ void updateStatus(Instance instance);
+
+ void init();
+
+ Instance getInstance(Instance instance);
+
+ List<Instance> getInstances(int status, int limit);
+
+ List<Instance> getRetryInstances();
+
+ List<Instance> getAlertInstances(AssistQueryEntity queryEntity);
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java
new file mode 100644
index 0000000000..ac42fe92aa
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.store.impl;
+
+import org.apache.hudi.table.management.entity.AssistQueryEntity;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.store.MetadataStore;
+import org.apache.hudi.table.management.store.jdbc.InstanceDao;
+
+import java.util.List;
+
+public class RelationDBBasedStore implements MetadataStore {
+
+ private final InstanceDao instanceDao;
+
+ public RelationDBBasedStore() {
+ this.instanceDao = new InstanceDao();
+ }
+
+ @Override
+ public void saveInstance(Instance instance) {
+ instanceDao.saveInstance(instance);
+ }
+
+ @Override
+ public void updateStatus(Instance instance) {
+ instanceDao.updateStatus(instance);
+ }
+
+ @Override
+ public void init() {
+ // do nothing
+ }
+
+ @Override
+ public Instance getInstance(Instance instance) {
+ return instanceDao.getInstance(instance);
+ }
+
+ @Override
+ public List<Instance> getInstances(int status, int limit) {
+ return instanceDao.getInstances(status, limit);
+ }
+
+ @Override
+ public List<Instance> getRetryInstances() {
+ return instanceDao.getRetryInstances();
+ }
+
+ @Override
+ public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) {
+ return instanceDao.getAlertInstances(queryEntity);
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java
new file mode 100644
index 0000000000..e43fc21b8c
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.store.jdbc;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory;
+import org.apache.ibatis.io.Resources;
+
+import java.io.IOException;
+import java.util.Properties;
+
+public class HikariDataSourceFactory extends UnpooledDataSourceFactory {
+ private static final String PROPERTIES_PATH = "hikariPool.properties";
+
+ public HikariDataSourceFactory() throws IOException {
+ Properties properties = new Properties();
+ properties.load(Resources.getResourceAsStream(PROPERTIES_PATH));
+ HikariConfig config = new HikariConfig(properties);
+ this.dataSource = new HikariDataSource(config);
+ }
+}
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java
new file mode 100644
index 0000000000..4c106138f7
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.store.jdbc;
+
+import org.apache.hudi.table.management.entity.AssistQueryEntity;
+import org.apache.hudi.table.management.entity.Instance;
+import org.apache.hudi.table.management.entity.InstanceStatus;
+
+import org.apache.ibatis.session.RowBounds;
+import org.apache.ibatis.session.SqlSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class InstanceDao {
+
+ private static Logger LOG = LoggerFactory.getLogger(InstanceDao.class);
+
+ private static final String NAMESPACE = "Instance";
+
+ public void saveInstance(Instance instance) {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ sqlSession.insert(statement(NAMESPACE, "saveInstance"), instance);
+ sqlSession.commit();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void updateStatus(Instance instance) {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ int ret = sqlSession.update(statement(NAMESPACE, getUpdateStatusSqlId(instance)), instance);
+ sqlSession.commit();
+ if (ret != 1) {
+ LOG.error("Fail update status instance: " + instance);
+ throw new RuntimeException("Fail update status instance: " + instance.getIdentifier());
+ }
+ LOG.info("Success update status instance: " + instance.getIdentifier());
+ } catch (Exception e) {
+ LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void updateExecutionInfo(Instance instance) {
+ int retryNum = 0;
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ while (retryNum++ < 3) {
+ int ret = sqlSession.update(statement(NAMESPACE, "updateExecutionInfo"), instance);
+ sqlSession.commit();
+ if (ret != 1) {
+ LOG.warn("Fail update execution info instance: " + instance);
+ TimeUnit.SECONDS.sleep(5);
+ } else {
+ LOG.info("Success update execution info, instance: " + instance.getIdentifier());
+ return;
+ }
+ }
+ throw new RuntimeException("Fail update execution info: " + instance.getIdentifier());
+ } catch (Exception e) {
+ LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Instance getInstance(Instance instance) {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ return sqlSession.selectOne(statement(NAMESPACE, "getInstance"), instance);
+ } catch (Exception e) {
+ LOG.error("Fail get Instance: " + instance.getIdentifier() + ", errMsg: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String getUpdateStatusSqlId(Instance instance) {
+ switch (InstanceStatus.getInstance(instance.getStatus())) {
+ case SCHEDULED:
+ return "retryInstance";
+ case RUNNING:
+ return "runningInstance";
+ case COMPLETED:
+ return "successInstance";
+ case FAILED:
+ return "failInstance";
+ case INVALID:
+ return "invalidInstance";
+ default:
+ throw new RuntimeException("Invalid instance: " + instance.getIdentifier());
+ }
+ }
+
+ public List<Instance> getInstances(int status, int limit) {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ if (limit > 0) {
+ return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status,
+ new RowBounds(0, limit));
+ } else {
+ return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status);
+ }
+ } catch (Exception e) {
+ LOG.error("Fail get instances, status: " + status + ", errMsg: ", e);
+ throw new RuntimeException("Fail get instances, status: " + status);
+ }
+ }
+
+ public List<Instance> getRetryInstances() {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ return sqlSession.selectList(statement(NAMESPACE, "getRetryInstances"),
+ new AssistQueryEntity());
+ } catch (Exception e) {
+ LOG.error("Fail get retry instances, errMsg: ", e);
+ throw new RuntimeException("Fail get retry instances");
+ }
+ }
+
+ public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ return sqlSession.selectList(statement(NAMESPACE, "getAlertInstances"),
+ queryEntity);
+ } catch (Exception e) {
+ LOG.error("Fail get alert instances, errMsg: ", e);
+ throw new RuntimeException("Fail get alert instances");
+ }
+ }
+
+ public List<Instance> getInstanceAfterTime(AssistQueryEntity queryEntity) {
+ try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
+ return sqlSession.selectList(statement(NAMESPACE, "getInstanceAfterTime"), queryEntity);
+ } catch (Exception e) {
+ LOG.error("Fail get instances after time, errMsg: ", e);
+ throw new RuntimeException("Fail get alert instances");
+ }
+ }
+
+ private String statement(String namespace, String sqlID) {
+ return namespace + "." + sqlID;
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java
new file mode 100644
index 0000000000..4dea08332c
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.store.jdbc;
+
+import org.apache.hudi.table.management.exception.HoodieTableManagementException;
+
+import org.apache.ibatis.io.Resources;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.apache.ibatis.session.SqlSessionFactoryBuilder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.PreparedStatement;
+import java.util.stream.Collectors;
+
+public class SqlSessionFactoryUtil {
+
+ private static final String CONFIG_PATH = "mybatis-config.xml";
+
+ private static SqlSessionFactory sqlSessionFactory;
+ private static final Class<?> CLASS_LOCK = SqlSessionFactoryUtil.class;
+
+ private SqlSessionFactoryUtil() {
+
+ }
+
+ public static void initSqlSessionFactory() {
+ try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) {
+ synchronized (CLASS_LOCK) {
+ if (sqlSessionFactory == null) {
+ sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
+ }
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static SqlSession openSqlSession() {
+ if (sqlSessionFactory == null) {
+ initSqlSessionFactory();
+ init();
+ }
+ return sqlSessionFactory.openSession();
+ }
+
+ public static void init() {
+ try {
+ String[] ddls = org.apache.commons.io.IOUtils.readLines(
+ SqlSessionFactoryUtil.class.getResourceAsStream("/table-management-service.sql"))
+ .stream().filter(e -> !e.startsWith("--"))
+ .collect(Collectors.joining(""))
+ .split(";");
+ for (String ddl : ddls) {
+ try (PreparedStatement statement = SqlSessionFactoryUtil.openSqlSession().getConnection()
+ .prepareStatement(ddl)) {
+ statement.execute();
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieTableManagementException("Unable to read init ddl file", e);
+ }
+ }
+
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java
new file mode 100644
index 0000000000..763047a26e
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.util;
+
+import java.util.Calendar;
+import java.util.Date;
+
+public class DateTimeUtils {
+
+ public static Date addDay(int amount) {
+ Calendar c = Calendar.getInstance();
+ c.setTime(new Date());
+ c.add(Calendar.DATE, amount);
+ return c.getTime();
+ }
+}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java
new file mode 100644
index 0000000000..27139bf6f5
--- /dev/null
+++ b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.management.util;
+
+import org.apache.hudi.table.management.entity.Action;
+import org.apache.hudi.table.management.entity.Engine;
+import org.apache.hudi.table.management.entity.Instance;
+
+public class InstanceUtil {
+
+ public static void checkArgument(Instance instance) {
+ if (instance.getExecutionEngine() == null) {
+ instance.setExecutionEngine(Engine.SPARK);
+ }
+ Engine.checkEngineType(instance);
+ Action.checkActionType(instance);
+ }
+}
diff --git a/hudi-table-management-service/src/main/resources/hikariPool.properties b/hudi-table-management-service/src/main/resources/hikariPool.properties
new file mode 100644
index 0000000000..a14104a127
--- /dev/null
+++ b/hudi-table-management-service/src/main/resources/hikariPool.properties
@@ -0,0 +1,20 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+jdbcUrl=jdbc:h2:mem:tms;MODE=MYSQL
+dataSource.user=root
+dataSource.password=password
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/resources/logback.xml b/hudi-table-management-service/src/main/resources/logback.xml
new file mode 100644
index 0000000000..f4d55b0ab7
--- /dev/null
+++ b/hudi-table-management-service/src/main/resources/logback.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+ <include resource="com/bytedance/logback/agent/base.xml"/>
+ <include resource="com/bytedance/logback/agent/agent-appender.xml"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
+ <layout class="ch.qos.logback.classic.PatternLayout">
+ <pattern>
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
+ </pattern>
+ </layout>
+ </encoder>
+ </appender>
+
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="AGENT"/>
+ </root>
+
+ <logger name="org.apache.spark" level="WARN"/>
+
+</configuration>
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/resources/mybatis-config.xml b/hudi-table-management-service/src/main/resources/mybatis-config.xml
new file mode 100644
index 0000000000..d9b6fc581b
--- /dev/null
+++ b/hudi-table-management-service/src/main/resources/mybatis-config.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
+<configuration>
+
+ <settings>
+ <setting name="lazyLoadingEnabled" value="false" />
+ <setting name="callSettersOnNulls" value="true"/>
+ <setting name="logImpl" value="STDOUT_LOGGING" />
+ </settings>
+
+ <typeAliases>
+
+ </typeAliases>
+
+ <environments default="development">
+ <environment id="development">
+ <transactionManager type="JDBC"/>
+ <dataSource type="org.apache.hudi.table.management.store.jdbc.HikariDataSourceFactory"/>
+ </environment>
+ </environments>
+
+ <mappers>
+ <mapper resource="mybatis/Instance.xml"/>
+ </mappers>
+
+</configuration>
diff --git a/hudi-table-management-service/src/main/resources/mybatis/Instance.xml b/hudi-table-management-service/src/main/resources/mybatis/Instance.xml
new file mode 100644
index 0000000000..c0d5d86d70
--- /dev/null
+++ b/hudi-table-management-service/src/main/resources/mybatis/Instance.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
+ "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="Instance">
+
+ <resultMap type="org.apache.hudi.table.management.entity.Instance" id="InstanceMapping">
+ <result column="id" property="id" javaType="java.lang.Long"/>
+ <result column="db_name" property="dbName"/>
+ <result column="table_name" property="tableName"/>
+ <result column="base_path" property="basePath"/>
+ <result column="execution_engine" property="executionEngine"/>
+ <result column="owner" property="owner"/>
+ <result column="cluster" property="cluster"/>
+ <result column="queue" property="queue"/>
+ <result column="resource" property="resource"/>
+ <result column="parallelism" property="parallelism"/>
+ <result column="auto_clean" property="autoClean"/>
+ <result column="instant" property="instant"/>
+ <result column="action" property="action" javaType="java.lang.Integer"/>
+ <result column="status" property="status" javaType="java.lang.Integer"/>
+ <result column="run_times" property="runTimes" javaType="java.lang.Integer"/>
+ <result column="application_id" property="applicationId"/>
+ <result column="dorado_job_id" property="doradoJobId"/>
+ <result column="schedule_time" property="scheduleTime" javaType="java.util.Date"/>
+ <result column="create_time" property="createTime" javaType="java.util.Date"/>
+ <result column="update_time" property="updateTime" javaType="java.util.Date"/>
+ </resultMap>
+
+ <sql id="selectColumns">
+ id, db_name, table_name, base_path, execution_engine, owner, cluster, queue, resource, parallelism, auto_clean,
+ instant, action, status, run_times, application_id, dorado_job_id, schedule_time, create_time, update_time
+ </sql>
+
+ <insert id="saveInstance"
+ parameterType="org.apache.hudi.table.management.entity.Instance"
+ useGeneratedKeys="true" keyProperty="id">
+ INSERT INTO instance (db_name, table_name, base_path, execution_engine, owner, cluster,
+ queue, resource, parallelism, auto_clean, instant, action, status, run_times)
+ VALUES (#{dbName}, #{tableName}, #{basePath}, #{executionEngine}, #{owner}, #{cluster},
+ #{queue},#{resource}, #{parallelism}, #{autoClean}, #{instant}, #{action}, #{status}, 0)
+ </insert>
+
+ <update id="runningInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
+ UPDATE instance
+ SET status = #{status},
+ schedule_time = now(),
+ run_times = run_times + 1
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ and status = 0
+ </update>
+
+ <update id="retryInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
+ UPDATE instance
+ SET status = #{status}
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ </update>
+
+ <update id="updateExecutionInfo"
+ parameterType="org.apache.hudi.table.management.entity.Instance">
+ UPDATE instance
+ SET dorado_job_id = #{doradoJobId},
+ application_id = #{applicationId}
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ </update>
+
+ <update id="successInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
+ UPDATE instance
+ SET status = #{status}
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ and status = 1
+ </update>
+
+ <update id="failInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
+ UPDATE instance
+ SET status = #{status}
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ and status = 1
+ </update>
+
+ <update id="invalidInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
+ UPDATE instance
+ SET status = #{status}
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ </update>
+
+ <select id="getInstance" parameterType="org.apache.hudi.table.management.entity.Instance"
+ resultMap="InstanceMapping">
+ SELECT <include refid="selectColumns"/>
+ FROM instance
+ WHERE db_name = #{dbName}
+ and table_name = #{tableName}
+ and instant = #{instant}
+ </select>
+
+ <select id="getInstances" parameterType="java.lang.Integer"
+ resultMap="InstanceMapping">
+ SELECT
+ <include refid="selectColumns"/>
+ FROM instance
+ WHERE status = #{status}
+ order by id
+ </select>
+
+ <select id="getRetryInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
+ resultMap="InstanceMapping">
+ SELECT
+ <include refid="selectColumns"/>
+ FROM instance
+ WHERE status = 2
+ and run_times <![CDATA[ <= ]]> #{maxRetry}
+ and update_time > #{queryStartTime}
+ order by id
+ </select>
+
+ <select id="getAlertInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
+ resultMap="InstanceMapping">
+ SELECT
+ <include refid="selectColumns"/>
+ FROM instance
+ WHERE status = #{status}
+ and run_times > #{maxRetry}
+ and update_time > #{queryStartTime}
+ order by id
+ </select>
+
+ <select id="getInstanceAfterTime" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
+ resultMap="InstanceMapping">
+ SELECT
+ <include refid="selectColumns"/>
+ FROM instance
+ WHERE status = #{status}
+ and update_time > #{queryStartTime}
+ order by id
+ </select>
+
+</mapper>
diff --git a/hudi-table-management-service/src/main/resources/table-management-service.sql b/hudi-table-management-service/src/main/resources/table-management-service.sql
new file mode 100644
index 0000000000..243880b2d9
--- /dev/null
+++ b/hudi-table-management-service/src/main/resources/table-management-service.sql
@@ -0,0 +1,46 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+CREATE TABLE if not exists `instance`
+(
+ `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
+ `db_name` varchar(128) NOT NULL COMMENT 'db name',
+ `table_name` varchar(128) NOT NULL COMMENT 'table name',
+ `base_path` varchar(128) NOT NULL COMMENT 'base path',
+ `execution_engine` varchar(128) NOT NULL COMMENT 'execution engine',
+ `owner` varchar(128) NOT NULL COMMENT 'owner',
+ `cluster` varchar(128) NOT NULL COMMENT 'cluster',
+ `queue` varchar(128) NOT NULL COMMENT 'queue',
+ `resource` varchar(128) NOT NULL COMMENT 'resource',
+ `parallelism` varchar(128) NOT NULL COMMENT 'parallelism',
+ `auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean',
+ `instant` varchar(128) NOT NULL COMMENT 'instant',
+ `action` int NOT NULL COMMENT 'action',
+ `status` int NOT NULL COMMENT 'status',
+ `run_times` int NOT NULL DEFAULT '0' COMMENT 'run times',
+ `application_id` varchar(128) DEFAULT NULL COMMENT 'application id',
+ `dorado_job_id` varchar(128) DEFAULT NULL COMMENT 'job id',
+ `schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`),
+ KEY `idx_status` (`status`),
+ KEY `idx_update_time_status` (`update_time`,`status`)
+) COMMENT='Table Management Service instance';
+
diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties b/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties
new file mode 100644
index 0000000000..b21b5d4070
--- /dev/null
+++ b/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties
@@ -0,0 +1,29 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache.hudi=DEBUG
+
+# CONSOLE is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# CONSOLE uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire.properties b/hudi-table-management-service/src/test/resources/log4j-surefire.properties
new file mode 100644
index 0000000000..c03e808cca
--- /dev/null
+++ b/hudi-table-management-service/src/test/resources/log4j-surefire.properties
@@ -0,0 +1,30 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+log4j.rootLogger=WARN, CONSOLE
+log4j.logger.org.apache=INFO
+log4j.logger.org.apache.hudi=DEBUG
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+# A1 uses PatternLayout.
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
+log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
+log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e3c8b3e8c6..96a86a0bb0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@
<module>hudi-hadoop-mr</module>
<module>hudi-spark-datasource</module>
<module>hudi-timeline-service</module>
+ <module>hudi-table-management-service</module>
<module>hudi-utilities</module>
<module>hudi-sync</module>
<module>packaging/hudi-hadoop-mr-bundle</module>