You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/06/21 08:58:57 UTC
[hudi] branch master updated: Revert master (#5925)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c7e430bb46 Revert master (#5925)
c7e430bb46 is described below
commit c7e430bb46ab721e4716ac89e1067d75a79b3179
Author: Zhaojing Yu <yu...@bytedance.com>
AuthorDate: Tue Jun 21 16:58:50 2022 +0800
Revert master (#5925)
* Revert "udate"
This reverts commit 092e35c1e300f1eb1a7474136826fed26bc10ccd.
* Revert "[HUDI-3475] Initialize hudi table management module."
This reverts commit 4640a3bbb8e212030f94848a0112784d98772de8.
---
.../cluster/ClusteringPlanActionExecutor.java | 20 --
.../apache/hudi/client/SparkRDDWriteClient.java | 4 -
hudi-table-management-service/pom.xml | 333 ---------------------
.../hudi/table/management/RequestHandler.java | 194 ------------
.../table/management/TableManagementServer.java | 153 ----------
.../hudi/table/management/common/EnvConstant.java | 29 --
.../table/management/common/ServiceConfig.java | 117 --------
.../table/management/common/ServiceContext.java | 78 -----
.../common/TableManagementServiceConfig.java | 29 --
.../hudi/table/management/entity/Action.java | 52 ----
.../table/management/entity/AssistQueryEntity.java | 47 ---
.../hudi/table/management/entity/Engine.java | 44 ---
.../hudi/table/management/entity/Instance.java | 92 ------
.../table/management/entity/InstanceStatus.java | 61 ----
.../exception/HoodieTableManagementException.java | 32 --
.../management/executor/BaseActionExecutor.java | 102 -------
.../management/executor/CompactionExecutor.java | 59 ----
.../executor/submitter/ExecutionEngine.java | 82 -----
.../management/executor/submitter/SparkEngine.java | 220 --------------
.../table/management/handlers/ActionHandler.java | 71 -----
.../management/handlers/ClusteringHandler.java | 51 ----
.../management/handlers/CompactionHandler.java | 66 ----
.../hudi/table/management/service/BaseService.java | 29 --
.../table/management/service/CleanService.java | 78 -----
.../table/management/service/ExecutorService.java | 102 -------
.../table/management/service/MonitorService.java | 65 ----
.../table/management/service/RetryService.java | 81 -----
.../table/management/service/ScheduleService.java | 116 -------
.../hudi/table/management/store/MetadataStore.java | 41 ---
.../store/impl/RelationDBBasedStore.java | 70 -----
.../store/jdbc/HikariDataSourceFactory.java | 38 ---
.../table/management/store/jdbc/InstanceDao.java | 156 ----------
.../store/jdbc/SqlSessionFactoryUtil.java | 82 -----
.../hudi/table/management/util/DateTimeUtils.java | 32 --
.../hudi/table/management/util/InstanceUtil.java | 34 ---
.../src/main/resources/hikariPool.properties | 20 --
.../src/main/resources/logback.xml | 41 ---
.../src/main/resources/mybatis-config.xml | 42 ---
.../src/main/resources/mybatis/Instance.xml | 165 ----------
.../main/resources/table-management-service.sql | 46 ---
.../test/resources/log4j-surefire-quiet.properties | 29 --
.../src/test/resources/log4j-surefire.properties | 30 --
pom.xml | 1 -
43 files changed, 3234 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
index 94ba014c4c..15ead5efb0 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java
@@ -21,10 +21,8 @@ package org.apache.hudi.table.action.cluster;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -41,9 +39,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieClusteringPlan>> {
@@ -106,22 +102,6 @@ public class ClusteringPlanActionExecutor<T extends HoodieRecordPayload, I, K, O
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
-
- if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.replacecommit.name())) {
- submitClusteringToService();
- }
-
return planOption;
}
-
- private void submitClusteringToService() {
- HoodieTableMetaClient metaClient = table.getMetaClient();
- List<String> instantsToSubmit = metaClient.getActiveTimeline()
- .filterPendingReplaceTimeline()
- .getInstants()
- .map(HoodieInstant::getTimestamp)
- .collect(Collectors.toList());
- HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(metaClient, config.getTableManagerConfig());
- tableManagerClient.submitClustering(instantsToSubmit);
- }
}
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index df38ba4a19..bdf478a8f6 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -333,10 +333,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
- // do not compact a complete instant.
- if (table.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime)) {
- return null;
- }
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
diff --git a/hudi-table-management-service/pom.xml b/hudi-table-management-service/pom.xml
deleted file mode 100644
index d5251abc4a..0000000000
--- a/hudi-table-management-service/pom.xml
+++ /dev/null
@@ -1,333 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>hudi</artifactId>
- <groupId>org.apache.hudi</groupId>
- <version>0.12.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>hudi-table-management-service</artifactId>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <mybatis.version>3.4.6</mybatis.version>
- </properties>
-
- <dependencies>
- <!-- Hoodie -->
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-cli</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-client-common</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- Spark -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- </dependency>
-
- <!-- Fasterxml -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-annotations</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <!-- Httpcomponents -->
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>fluent-hc</artifactId>
- </dependency>
-
- <dependency>
- <groupId>io.javalin</groupId>
- <artifactId>javalin</artifactId>
- <version>2.8.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.beust</groupId>
- <artifactId>jcommander</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.rocksdb</groupId>
- <artifactId>rocksdbjni</artifactId>
- </dependency>
-
- <!-- Hadoop -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <!-- Need these exclusions to make sure JavaSparkContext can be setup. https://issues.apache.org/jira/browse/SPARK-1693 -->
- <exclusions>
- <exclusion>
- <artifactId>tools</artifactId>
- <groupId>com.sun</groupId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>tools</artifactId>
- <groupId>com.sun</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>tools</artifactId>
- <groupId>com.sun</groupId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-java-client</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.mybatis</groupId>
- <artifactId>mybatis</artifactId>
- <version>${mybatis.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.24</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>${avro.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>1.7.25</version>
- </dependency>
-
- <dependency>
- <groupId>com.zaxxer</groupId>
- <artifactId>HikariCP</artifactId>
- <version>4.0.3</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.23</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.2</version>
- </dependency>
-
- <!-- Test -->
- <dependency>
- <groupId>com.h2database</groupId>
- <artifactId>h2</artifactId>
- <version>1.4.200</version>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-api</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-common</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hudi</groupId>
- <artifactId>hudi-client-common</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>${maven-jar-plugin.version}</version>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- <phase>test-compile</phase>
- </execution>
- </executions>
- <configuration>
- <skip>false</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.4</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
- <mainClass>org.apache.hudi.compaction.service.TableManagerServer</mainClass>
- </transformer>
- </transformers>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
- <artifactSet>
- <excludes>
- <exclude>
- org.slf4j:slf4j-log4j12
- </exclude>
- </excludes>
- </artifactSet>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>META-INF/services/javax.*</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
- <resources>
- <resource>
- <directory>src/main/resources</directory>
- </resource>
- <resource>
- <directory>src/test/resources</directory>
- </resource>
- </resources>
- </build>
-
-</project>
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java
deleted file mode 100644
index 32d2ebbe74..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/RequestHandler.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management;
-
-import org.apache.hudi.table.management.entity.Action;
-import org.apache.hudi.table.management.entity.Engine;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.entity.InstanceStatus;
-import org.apache.hudi.table.management.handlers.ActionHandler;
-import org.apache.hudi.table.management.store.MetadataStore;
-import org.apache.hudi.table.management.util.InstanceUtil;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.javalin.Context;
-import io.javalin.Handler;
-import io.javalin.Javalin;
-import org.apache.hadoop.conf.Configuration;
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Main REST Handler class that handles and delegates calls to timeline relevant handlers.
- */
-public class RequestHandler {
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class);
-
- private final Javalin app;
- private final ActionHandler actionHandler;
-
- public RequestHandler(Javalin app,
- Configuration conf,
- MetadataStore metadataStore) throws IOException {
- this.app = app;
- this.actionHandler = new ActionHandler(conf, metadataStore);
- }
-
- public void register() {
- registerCommonAPI();
- registerCompactionAPI();
- registerClusteringAPI();
- }
-
- private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException {
- boolean prettyPrint = ctx.queryParam("pretty") != null;
- long beginJsonTs = System.currentTimeMillis();
- String result =
- prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj);
- long endJsonTs = System.currentTimeMillis();
- LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs));
- ctx.result(result);
- }
-
- /**
- * Register Compaction API calls.
- */
- private void registerCommonAPI() {
- app.get(HoodieTableManagerClient.REGISTER, new ViewHandler(ctx -> {
- }));
- }
-
- /**
- * Register Compaction API calls.
- */
- private void registerCompactionAPI() {
- app.get(HoodieTableManagerClient.SUBMIT_COMPACTION, new ViewHandler(ctx -> {
- for (String instant : ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow().split(",")) {
- Instance instance = Instance.builder()
- .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
- .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
- .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
- .action(Action.COMPACTION.getValue())
- .instant(instant)
- .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow()))
- .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow())
- .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow())
- .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow())
- .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow())
- .status(InstanceStatus.SCHEDULED.getStatus())
- .build();
- InstanceUtil.checkArgument(instance);
- actionHandler.scheduleCompaction(instance);
- }
- }));
-
- app.get(HoodieTableManagerClient.REMOVE_COMPACTION, new ViewHandler(ctx -> {
- Instance instance = Instance.builder()
- .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
- .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
- .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
- .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
- .status(InstanceStatus.INVALID.getStatus())
- .isDeleted(true)
- .build();
- actionHandler.removeCompaction(instance);
- }));
- }
-
- /**
- * Register Compaction API calls.
- */
- private void registerClusteringAPI() {
- app.get(HoodieTableManagerClient.SUBMIT_CLUSTERING, new ViewHandler(ctx -> {
- Instance instance = Instance.builder()
- .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
- .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
- .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
- .action(Action.CLUSTERING.getValue())
- .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
- .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableManagerClient.EXECUTION_ENGINE).getOrThrow()))
- .owner(ctx.validatedQueryParam(HoodieTableManagerClient.USERNAME).getOrThrow())
- .queue(ctx.validatedQueryParam(HoodieTableManagerClient.QUEUE).getOrThrow())
- .resource(ctx.validatedQueryParam(HoodieTableManagerClient.RESOURCE).getOrThrow())
- .parallelism(ctx.validatedQueryParam(HoodieTableManagerClient.PARALLELISM).getOrThrow())
- .status(InstanceStatus.SCHEDULED.getStatus())
- .build();
- InstanceUtil.checkArgument(instance);
- actionHandler.scheduleClustering(instance);
- }));
-
- app.get(HoodieTableManagerClient.REMOVE_CLUSTERING, new ViewHandler(ctx -> {
- Instance instance = Instance.builder()
- .basePath(ctx.validatedQueryParam(HoodieTableManagerClient.BASEPATH_PARAM).getOrThrow())
- .dbName(ctx.validatedQueryParam(HoodieTableManagerClient.DATABASE_NAME_PARAM).getOrThrow())
- .tableName(ctx.validatedQueryParam(HoodieTableManagerClient.TABLE_NAME_PARAM).getOrThrow())
- .instant(ctx.validatedQueryParam(HoodieTableManagerClient.INSTANT_PARAM).getOrThrow())
- .status(InstanceStatus.INVALID.getStatus())
- .isDeleted(true)
- .build();
- actionHandler.removeClustering(instance);
- }));
- }
-
- /**
- * Used for logging and performing refresh check.
- */
- private class ViewHandler implements Handler {
-
- private final Handler handler;
-
- ViewHandler(Handler handler) {
- this.handler = handler;
- }
-
- @Override
- public void handle(@NotNull Context context) throws Exception {
- boolean success = true;
- long beginTs = System.currentTimeMillis();
- boolean synced = false;
- long refreshCheckTimeTaken = 0;
- long handleTimeTaken = 0;
- long finalCheckTimeTaken = 0;
- try {
- long handleBeginMs = System.currentTimeMillis();
- handler.handle(context);
- long handleEndMs = System.currentTimeMillis();
- handleTimeTaken = handleEndMs - handleBeginMs;
- } catch (RuntimeException re) {
- success = false;
- LOG.error("Got runtime exception servicing request " + context.queryString(), re);
- throw re;
- } finally {
- long endTs = System.currentTimeMillis();
- long timeTakenMillis = endTs - beginTs;
- LOG.info(String.format(
- "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
- + "Success=%s, Query=%s, Host=%s, synced=%s",
- timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success,
- context.queryString(), context.host(), synced));
- }
- }
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java
deleted file mode 100644
index 3fbf6b2db6..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/TableManagementServer.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.common.TableManagementServiceConfig;
-import org.apache.hudi.table.management.service.BaseService;
-import org.apache.hudi.table.management.service.CleanService;
-import org.apache.hudi.table.management.service.ExecutorService;
-import org.apache.hudi.table.management.service.MonitorService;
-import org.apache.hudi.table.management.service.RetryService;
-import org.apache.hudi.table.management.service.ScheduleService;
-import org.apache.hudi.table.management.store.MetadataStore;
-
-import com.beust.jcommander.JCommander;
-import io.javalin.Javalin;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A standalone table management service.
- */
-public class TableManagementServer {
-
- private static final Logger LOG = LoggerFactory.getLogger(TableManagementServer.class);
-
- private int serverPort;
- private final Configuration conf;
- private final TableManagementServiceConfig config;
- private transient Javalin app = null;
- private List<BaseService> services;
- private MetadataStore metadataStore;
-
- public TableManagementServer(int serverPort, Configuration conf, TableManagementServiceConfig config)
- throws IOException {
- this.config = config;
- this.conf = FSUtils.prepareHadoopConf(conf);
- this.fs = FileSystem.get(conf);
- this.serverPort = serverPort;
- this.metadataStore = initMetadataStore();
- }
-
- public TableManagementServer(TableManagementServiceConfig config) throws IOException {
- this(config.serverPort, new Configuration(), config);
- }
-
- public int startService() throws IOException {
- app = Javalin.create();
- RequestHandler requestHandler = new RequestHandler(app, conf, metadataStore);
- app.get("/", ctx -> ctx.result("Hello World"));
- requestHandler.register();
- app.start(serverPort);
- registerService();
- initAndStartRegisterService();
- return serverPort;
- }
-
- private MetadataStore initMetadataStore() {
- String className = ServiceConfig.getInstance()
- .getString(ServiceConfig.ServiceConfVars.MetadataStoreClass);
- MetadataStore metadataStore = ReflectionUtils.loadClass(className);
- metadataStore.init();
- LOG.info("Finish init metastore: " + className);
- return metadataStore;
- }
-
- private void registerService() {
- services = new ArrayList<>();
- ExecutorService executorService = new ExecutorService();
- services.add(executorService);
- services.add(new ScheduleService(executorService, metadataStore));
- services.add(new RetryService(metadataStore));
- services.add(new MonitorService());
- services.add(new CleanService());
- }
-
- private void initAndStartRegisterService() {
- for (BaseService service : services) {
- service.init();
- service.startService();
- }
- }
-
- private void stopRegisterService() {
- for (BaseService service : services) {
- service.stop();
- }
- }
-
- public void run() throws IOException {
- startService();
- Runtime.getRuntime()
- .addShutdownHook(
- new Thread(
- () -> {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- System.err.println(
- "*** shutting down Table management service since JVM is shutting down");
- try {
- TableManagementServer.this.stop();
- } catch (InterruptedException e) {
- e.printStackTrace(System.err);
- }
- System.err.println("*** Table management service shut down");
- }));
- }
-
- /**
- * Stop serving requests and shutdown resources.
- */
- public void stop() throws InterruptedException {
- LOG.info("Stopping Table management Service");
- this.app.stop();
- this.app = null;
- stopRegisterService();
- LOG.info("Stopped Table management Service");
- }
-
- public static void main(String[] args) throws Exception {
- final TableManagementServiceConfig cfg = new TableManagementServiceConfig();
- JCommander cmd = new JCommander(cfg, null, args);
- if (cfg.help) {
- cmd.usage();
- System.exit(1);
- }
- TableManagementServer service = new TableManagementServer(cfg);
- service.run();
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java
deleted file mode 100644
index c33e88b103..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/EnvConstant.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.common;
-
-public class EnvConstant {
- public static final String JAVA_HOME = "JAVA_HOME";
- public static final String YARN_CONF_DIR = "YARN_CONF_DIR";
-
- public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
- public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
-
- public static final String SPARK_HOME = "SPARK_HOME";
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java
deleted file mode 100644
index d60cf05bee..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceConfig.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.common;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class ServiceConfig extends Properties {
-
- private static Logger LOG = LoggerFactory.getLogger(ServiceConfig.class);
- private static final String HOODIE_ENV_PROPS_PREFIX = "hoodie_";
-
- private static ServiceConfig CONFIG = new ServiceConfig();
-
- /**
- * Constructor.
- */
- private ServiceConfig() {
- LOG.info("Start init ServiceConfig");
- Map<String, String> envs = System.getenv();
- for (Map.Entry<String, String> env : envs.entrySet()) {
- if (env.getKey().toLowerCase().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
- String key = env.getKey().toLowerCase().replace("_", ".table.management.");
- String value = env.getValue().trim();
- setProperty(key, value);
- LOG.info("Set property " + key + " to " + value);
- }
- }
- LOG.info("Finish init ServiceConfig");
- }
-
- public String getString(ServiceConfVars confVars) {
- return this.getProperty(confVars.key(), confVars.defVal());
- }
-
- public void setString(ServiceConfVars confVars, String value) {
- this.setProperty(confVars.key(), value);
- }
-
- public Boolean getBool(ServiceConfVars confVars) {
- return Boolean.valueOf(this.getProperty(confVars.key(), confVars.defVal()));
- }
-
- public int getInt(ServiceConfVars confVars) {
- return Integer.parseInt(this.getProperty(confVars.key(), confVars.defVal()));
- }
-
- public static ServiceConfig getInstance() {
- return CONFIG;
- }
-
- public enum ServiceConfVars {
- JavaHome("hoodie.table.management.java.home", ""),
- SparkHome("hoodie.table.management.spark.home", ""),
- YarnConfDir("hoodie.table.management.yarn.conf.dir", ""),
- HadoopConfDir("hoodie.table.management.hadoop.conf.dir", ""),
- CompactionMainClass("hoodie.table.management.compaction.main.class", "org.apache.hudi.utilities.HoodieCompactor"),
- CompactionScheduleWaitInterval("hoodie.table.management.schedule.wait.interval", "30000"),
- IntraMaxFailTolerance("hoodie.table.management.max.fail.tolerance", "5"),
- MaxRetryNum("hoodie.table.management.instance.max.retry", "3"),
- MetadataStoreClass("hoodie.table.management.metadata.store.class",
- "org.apache.hudi.table.management.store.impl.RelationDBBasedStore"),
- CompactionCacheEnable("hoodie.table.management.compaction.cache.enable", "true"),
- RetryTimes("hoodie.table.management.retry.times", "5"),
- SparkSubmitJarPath("hoodie.table.management.submit.jar.path", "/tmp/hoodie_submit_jar/spark/"),
- SparkShuffleHdfsEnabled("hoodie.table.management.spark.shuffle.hdfs.enabled", "true"),
- SparkParallelism("hoodie.table.management.spark.parallelism", "1"),
- SparkMaster("hoodie.table.management.spark.parallelism", "local[1]"),
- SparkVcoreBoost("hoodie.table.management.spark.vcore.boost", "1"),
- SparkVcoreBoostRatio("hoodie.table.management.spark.vcore.boost.ratio", "1"),
- SparkSpeculation("hoodie.table.management.spark.speculation", "false"),
- ExecutorMemory("hoodie.table.management.executor.memory", "20g"),
- DriverMemory("hoodie.table.management.driver.memory", "20g"),
- ExecutorMemoryOverhead("hoodie.table.management.executor.memory.overhead", "5g"),
- ExecutorCores("hoodie.table.management.executor.cores", "1"),
- MinExecutors("hoodie.table.management.min.executors", "5"),
- MaxExecutors("hoodie.table.management.max.executors", "1000"),
- CoreExecuteSize("hoodie.table.management.core.executor.pool.size", "300"),
- MaxExecuteSize("hoodie.table.management.max.executor.pool.size", "1000");
-
- private final String key;
- private final String defaultVal;
-
- ServiceConfVars(String key, String defaultVal) {
- this.key = key;
- this.defaultVal = defaultVal;
- }
-
- public String key() {
- return this.key;
- }
-
- public String defVal() {
- return this.defaultVal;
- }
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java
deleted file mode 100644
index cd5240da36..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/ServiceContext.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.common;
-
-import org.apache.hudi.table.management.store.jdbc.InstanceDao;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class ServiceContext {
-
- private static ConcurrentHashMap<String, String> runningInstance = new ConcurrentHashMap<>();
-
- public static void addRunningInstance(String instanceIdentifier, String threadIdentifier) {
- runningInstance.put(instanceIdentifier, threadIdentifier);
- }
-
- public static void removeRunningInstance(String instanceIdentifier) {
- runningInstance.remove(instanceIdentifier);
- }
-
- public static int getRunningInstanceNum() {
- return runningInstance.size();
- }
-
- public static List<String> getRunningInstanceInfo() {
- List<String> runningInfos = new ArrayList<>();
- for (Map.Entry<String, String> instance : runningInstance.entrySet()) {
- runningInfos.add("instance " + instance.getKey() + " execution on " + instance.getValue());
- }
- return runningInfos;
- }
-
- private static ConcurrentHashMap<String, Long> pendingInstances = new ConcurrentHashMap<>();
-
- public static boolean containsPendingInstant(String key) {
- return pendingInstances.containsKey(key);
- }
-
- public static void refreshPendingInstant(String key) {
- pendingInstances.put(key, System.currentTimeMillis());
- }
-
- public static void removePendingInstant(String key) {
- pendingInstances.remove(key);
- }
-
- public static ConcurrentHashMap<String, Long> getPendingInstances() {
- return pendingInstances;
- }
-
- public static InstanceDao getInstanceDao() {
- return ServiceContextHolder.INSTANCE_DAO;
- }
-
- private static class ServiceContextHolder {
- private static final InstanceDao INSTANCE_DAO = new InstanceDao();
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java
deleted file mode 100644
index c756631f05..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/common/TableManagementServiceConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.common;
-
-import com.beust.jcommander.Parameter;
-
-public class TableManagementServiceConfig {
- @Parameter(names = {"--server-port", "-p"}, description = " Server Port")
- public Integer serverPort = 26755;
-
- @Parameter(names = {"--help", "-h"})
- public Boolean help = false;
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java
deleted file mode 100644
index e84f75666a..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Action.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.entity;
-
-public enum Action {
- COMPACTION(0),
- CLUSTERING(1);
-
- private final int value;
-
- Action(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return this.value;
- }
-
- public static void checkActionType(Instance instance) {
- for (Action action : Action.values()) {
- if (action.getValue() == instance.getAction()) {
- return;
- }
- }
- throw new RuntimeException("Invalid action type: " + instance);
- }
-
- public static Action getAction(int actionValue) {
- for (Action action : Action.values()) {
- if (action.getValue() == actionValue) {
- return action;
- }
- }
- throw new RuntimeException("Invalid instance action: " + actionValue);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java
deleted file mode 100644
index d7e8b9fe93..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/AssistQueryEntity.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.entity;
-
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.util.DateTimeUtils;
-
-import lombok.Getter;
-
-import java.util.Date;
-
-@Getter
-public class AssistQueryEntity {
-
- private int maxRetry = ServiceConfig.getInstance()
- .getInt(ServiceConfig.ServiceConfVars.MaxRetryNum);
-
- private Date queryStartTime = DateTimeUtils.addDay(-3);
-
- private int status;
-
- public AssistQueryEntity() {
-
- }
-
- public AssistQueryEntity(int status, Date queryStartTime) {
- this.status = status;
- this.queryStartTime = queryStartTime;
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java
deleted file mode 100644
index edcb45c0ae..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Engine.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.entity;
-
-public enum Engine {
-
- SPARK(0),
- FLINK(1);
-
- private final int value;
-
- Engine(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return this.value;
- }
-
- public static void checkEngineType(Instance instance) {
- for (Engine engine : Engine.values()) {
- if (engine.equals(instance.getExecutionEngine())) {
- return;
- }
- }
- throw new RuntimeException("Invalid engine type: " + instance);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java
deleted file mode 100644
index 91cfeb6ce5..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/Instance.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.entity;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
-
-import java.util.Date;
-
-@Builder
-@Getter
-@Setter
-@ToString
-@NoArgsConstructor
-@AllArgsConstructor
-public class Instance {
-
- private long id;
-
- private String dbName;
-
- private String tableName;
-
- private String basePath;
-
- private Engine executionEngine;
-
- private String owner;
-
- private String queue;
-
- private String resource;
-
- private String parallelism;
-
- private String instant;
-
- private int action;
-
- private int status;
-
- private int runTimes;
-
- private String applicationId;
-
- private String doradoJobId;
-
- private Date scheduleTime;
-
- private Date createTime;
-
- private Date updateTime;
-
- private boolean isDeleted;
-
- public String getFullTableName() {
- return dbName + "." + tableName;
- }
-
- public String getIdentifier() {
- return dbName + "." + tableName + "." + instant + "." + status;
- }
-
- public String getInstanceRunStatus() {
- return dbName + "." + tableName + "." + instant + "." + status + "."
- + runTimes + "." + updateTime;
- }
-
- public String getRecordKey() {
- return dbName + "." + tableName + "." + instant;
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java
deleted file mode 100644
index 4577cd236b..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/entity/InstanceStatus.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.entity;
-
-public enum InstanceStatus {
-
- SCHEDULED(0, "scheduled"),
- RUNNING(1, "running"),
- FAILED(2, "failed"),
- INVALID(3, "invalid"),
- COMPLETED(4, "completed");
-
- private int status;
- private String desc;
-
- InstanceStatus(int status, String desc) {
- this.status = status;
- this.desc = desc;
- }
-
- public int getStatus() {
- return status;
- }
-
- public void setStatus(int status) {
- this.status = status;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public void setDesc(String desc) {
- this.desc = desc;
- }
-
- public static InstanceStatus getInstance(int status) {
- for (InstanceStatus instanceStatus : InstanceStatus.values()) {
- if (instanceStatus.getStatus() == status) {
- return instanceStatus;
- }
- }
- throw new RuntimeException("Invalid instance status: " + status);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java
deleted file mode 100644
index 84db007854..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/exception/HoodieTableManagementException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.exception;
-
-import org.apache.hudi.exception.HoodieException;
-
-public class HoodieTableManagementException extends HoodieException {
-
- public HoodieTableManagementException(String msg) {
- super(msg);
- }
-
- public HoodieTableManagementException(String msg, Throwable e) {
- super(msg, e);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java
deleted file mode 100644
index e3ac837492..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/BaseActionExecutor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.executor;
-
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.common.ServiceContext;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.entity.InstanceStatus;
-import org.apache.hudi.table.management.executor.submitter.ExecutionEngine;
-import org.apache.hudi.table.management.executor.submitter.SparkEngine;
-import org.apache.hudi.table.management.store.jdbc.InstanceDao;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class BaseActionExecutor implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(BaseActionExecutor.class);
-
- protected InstanceDao instanceDao;
- protected Instance instance;
- public int maxFailTolerance;
- protected ExecutionEngine engine;
-
- public BaseActionExecutor(Instance instance) {
- this.instance = instance;
- this.instanceDao = ServiceContext.getInstanceDao();
- this.maxFailTolerance = ServiceConfig.getInstance()
- .getInt(ServiceConfig.ServiceConfVars.IntraMaxFailTolerance);
- String mainClass = ServiceConfig.getInstance()
- .getString(ServiceConfig.ServiceConfVars.CompactionMainClass);
- switch (instance.getExecutionEngine()) {
- case SPARK:
- engine = new SparkEngine(getJobName(instance), instance, mainClass);
- break;
- case FLINK:
- default:
- throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine());
- }
- }
-
- @Override
- public void run() {
- ServiceContext.addRunningInstance(instance.getRecordKey(), getThreadIdentifier());
- try {
- execute();
- } finally {
- ServiceContext.removeRunningInstance(instance.getRecordKey());
- if (ServiceConfig.getInstance()
- .getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable)) {
- ServiceContext.removePendingInstant(instance.getRecordKey());
- }
- }
- }
-
- public abstract boolean doExecute();
-
- public abstract String getJobName(Instance instance);
-
- public void execute() {
- try {
- boolean success = doExecute();
- if (success) {
- instance.setStatus(InstanceStatus.COMPLETED.getStatus());
- LOG.info("Success exec instance: " + instance.getIdentifier());
- } else {
- instance.setStatus(InstanceStatus.FAILED.getStatus());
- LOG.info("Fail exec instance: " + instance.getIdentifier());
- }
- } catch (Exception e) {
- instance.setStatus(InstanceStatus.FAILED.getStatus());
- LOG.error("Fail exec instance: " + instance.getIdentifier() + ", errMsg: ", e);
- }
- instanceDao.updateStatus(instance);
- }
-
- public String getThreadIdentifier() {
- return Thread.currentThread().getId() + "." + Thread.currentThread().getName() + "."
- + Thread.currentThread().getState();
- }
-
- @Override
- public String toString() {
- return this.getClass().getName() + ", instance: " + instance.getIdentifier();
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java
deleted file mode 100644
index 9e21663651..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/CompactionExecutor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.executor;
-
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.entity.InstanceStatus;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CompactionExecutor extends BaseActionExecutor {
-
- private static final Logger LOG = LoggerFactory.getLogger(CompactionExecutor.class);
-
- public static final String COMPACT_JOB_NAME = "Hoodie compact %s.%s %s";
-
- public CompactionExecutor(Instance instance) {
- super(instance);
- }
-
- @Override
- public boolean doExecute() {
- String jobName = getJobName(instance);
- LOG.info("Start exec : " + jobName);
- instance.setStatus(InstanceStatus.RUNNING.getStatus());
- instanceDao.saveInstance(instance);
- String applicationId = engine.execute(jobName, instance);
- if (StringUtils.isNullOrEmpty(applicationId)) {
- LOG.warn("Failed to run compaction for " + jobName);
- return false;
- }
-
- LOG.info("Compaction successfully completed for " + jobName);
- return true;
- }
-
- @Override
- public String getJobName(Instance instance) {
- return String.format(COMPACT_JOB_NAME, instance.getDbName(), instance.getTableName(),
- instance.getInstant());
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java
deleted file mode 100644
index c4fdb98faf..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/ExecutionEngine.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.executor.submitter;
-
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.exception.HoodieTableManagementException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-public abstract class ExecutionEngine {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExecutionEngine.class);
-
- protected static final String YARN_SUBMITTED = "Submitted application";
-
- public String execute(String jobName, Instance instance) throws HoodieTableManagementException {
- try {
- LOG.info("Submitting instance {}:{}", jobName, instance.getIdentifier());
- beforeExecuteCommand();
- return executeCommand(jobName, instance);
- } catch (Exception e) {
- throw new HoodieTableManagementException("Failed submit instance " + instance, e);
- }
- }
-
- protected String executeCommand(String jobName, Instance instance) {
- String command = "";
- try {
- command = getCommand();
- LOG.info("Execute command: {}", command);
- Map<String, String> env = setProcessEnv();
- LOG.info("Execute env: {}", env);
-
- return "-1";
-
-// ExecuteHelper executeHelper = new ExecuteHelper(command, jobName, env);
-// CompletableFuture<Void> executeFuture = executeHelper.getExecuteThread();
-// executeFuture.whenComplete((Void ignored, Throwable throwable) -> executeHelper.closeProcess());
-// while (!executeFuture.isDone()) {
-// LOG.info("Waiting for execute job " + jobName);
-// TimeUnit.SECONDS.sleep(5);
-// }
-// if (executeHelper.isSuccess) {
-// LOG.info("Execute job {} command success", jobName);
-// } else {
-// LOG.info("Execute job {} command failed", jobName);
-// }
-// return executeHelper.applicationId;
- } catch (Exception e) {
- LOG.error("Execute command error with exception: ", e);
- throw new HoodieTableManagementException("Execute " + command + " command error", e);
- }
- }
-
- protected abstract String getCommand() throws IOException;
-
- protected abstract void beforeExecuteCommand();
-
- public abstract Map<String, String> setProcessEnv();
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java
deleted file mode 100644
index 99babcd2f5..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/executor/submitter/SparkEngine.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.executor.submitter;
-
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.table.management.common.EnvConstant;
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.exception.HoodieTableManagementException;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.hudi.cli.utils.SparkUtil.initLauncher;
-
-public class SparkEngine extends ExecutionEngine {
-
- private static final Logger LOG = LoggerFactory.getLogger(SparkEngine.class);
-
- private String jobName;
- private Instance instance;
- private String mainClass;
-
- public SparkEngine(String jobName, Instance instance, String mainClass) {
- this.jobName = jobName;
- this.instance = instance;
- this.mainClass = mainClass;
- }
-
- @Override
- protected String getCommand() throws IOException {
- String format = "%s/bin/spark-submit --class %s --master yarn --deploy-mode cluster %s %s %s";
- return String.format(format, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome),
- mainClass, getSparkArgs(), getSubmitJar(), getJobArgs());
- }
-
- @Override
- protected void beforeExecuteCommand() {
-
- }
-
- @Override
- public Map<String, String> setProcessEnv() {
- Map<String, String> env = new HashMap<>(16);
- env.put(EnvConstant.JAVA_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.JavaHome));
- env.put(EnvConstant.YARN_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.YarnConfDir));
- env.put(EnvConstant.SPARK_HOME, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkHome));
- env.put(EnvConstant.HADOOP_CONF_DIR, ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.HadoopConfDir));
- env.put(EnvConstant.HADOOP_USER_NAME, instance.getOwner());
- return env;
- }
-
- private String getJobArgs() throws IOException {
- return null;
- }
-
- private String getSubmitJar() throws IOException {
- File sparkSubmitJarPath = new File(ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSubmitJarPath));
- if (!sparkSubmitJarPath.isDirectory()) {
- throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath + " should be be a directory");
- }
- File[] jars = sparkSubmitJarPath.listFiles(file -> !file.getName().endsWith(".jar"));
- if (jars == null || jars.length != 1) {
- throw new HoodieTableManagementException("Spark submit jar path " + sparkSubmitJarPath
- + " should only have one jar, jars = " + Arrays.toString(jars));
- }
- return jars[0].getCanonicalPath();
- }
-
- private String getSparkArgs() {
- StringBuilder sparkArgs = new StringBuilder();
- sparkArgs.append("--queue ").append(instance.getQueue());
- sparkArgs.append(" --name ").append(jobName);
-
- Map<String, String> sparkParams = new HashMap<>();
- sparkParams.put("mapreduce.job.queuename", instance.getQueue());
- sparkParams.put("spark.shuffle.hdfs.enabled", ServiceConfig.getInstance()
- .getString(ServiceConfig.ServiceConfVars.SparkShuffleHdfsEnabled));
- String parallelism = StringUtils.isNullOrEmpty(instance.getParallelism())
- ? ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MaxExecutors)
- : instance.getParallelism();
- sparkParams.put("spark.dynamicAllocation.maxExecutors", parallelism);
- sparkParams.put("spark.dynamicAllocation.minExecutors",
- ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.MinExecutors));
- sparkParams.put("spark.vcore.boost",
- ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoost));
- sparkParams.put("spark.vcore.boost.ratio",
- ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkVcoreBoostRatio));
- sparkParams.put("spark.speculation",
- ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkSpeculation));
- String driverResource;
- String executorResource;
- String resource = instance.getResource().trim();
- if (StringUtils.isNullOrEmpty(resource)) {
- driverResource = ServiceConfig.getInstance()
- .getString(ServiceConfig.ServiceConfVars.DriverMemory);
- executorResource = ServiceConfig.getInstance()
- .getString(ServiceConfig.ServiceConfVars.ExecutorMemory);
- } else {
- String[] resourceArray = resource.split(":");
- if (resourceArray.length == 1) {
- driverResource = resourceArray[0];
- executorResource = resourceArray[0];
- } else if (resourceArray.length == 2) {
- driverResource = resourceArray[0];
- executorResource = resourceArray[1];
- } else {
- throw new RuntimeException(
- "Invalid conf: " + instance.getIdentifier() + ", resource: " + resource);
- }
- }
- sparkParams.put("spark.executor.cores",
- ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorCores));
- sparkParams.put("spark.executor.memory", executorResource);
- sparkParams.put("spark.driver.memory", driverResource);
- sparkParams.put("spark.executor.memoryOverhead", ServiceConfig.getInstance()
- .getString(ServiceConfig.ServiceConfVars.ExecutorMemoryOverhead));
-
- for (Map.Entry<String, String> entry : sparkParams.entrySet()) {
- sparkArgs
- .append(" --conf ")
- .append(entry.getKey())
- .append("=")
- .append(entry.getValue());
- }
-
- return sparkArgs.toString();
- }
-
- @Override
- public String executeCommand(String jobName, Instance instance) throws HoodieTableManagementException {
- String sparkPropertiesPath =
- Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
- SparkLauncher sparkLauncher;
- try {
- sparkLauncher = initLauncher(sparkPropertiesPath);
- } catch (URISyntaxException e) {
- LOG.error("Failed to init spark launcher");
- throw new HoodieTableManagementException("Failed to init spark launcher", e);
- }
-
- String master = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkMaster);
- String sparkMemory = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.ExecutorMemory);
- String parallelism = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.SparkParallelism);
- String retry = ServiceConfig.getInstance().getString(ServiceConfig.ServiceConfVars.RetryTimes);
-
- sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(),
- instance.getTableName(), instance.getInstant(), parallelism, "", retry, "");
-
- Process process;
- try {
- process = sparkLauncher.launch();
- } catch (IOException e) {
- LOG.error("Failed to launcher spark process");
- throw new HoodieTableManagementException("Failed to init spark launcher", e);
- }
-
- InputStream inputStream = null;
- BufferedReader bufferedReader = null;
- String applicationId = null;
- try {
- inputStream = process.getInputStream();
- bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
- String line;
- while ((line = bufferedReader.readLine()) != null) {
- LOG.info(line);
- if (line.contains(YARN_SUBMITTED)) {
- String[] split = line.split(YARN_SUBMITTED);
- applicationId = split[1].trim();
- LOG.info("Execute job {} get application id {}", jobName, applicationId);
- break;
- }
- }
- } catch (Exception e) {
- LOG.error("execute {} process get application id error", jobName, e);
- throw new HoodieTableManagementException("execute " + jobName + " process get application id error", e);
- } finally {
- if (process != null) {
- process.destroyForcibly();
- }
- if (inputStream != null) {
- IOUtils.closeQuietly(inputStream);
- }
- if (bufferedReader != null) {
- IOUtils.closeQuietly(bufferedReader);
- }
- }
-
- return applicationId;
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java
deleted file mode 100644
index 62e8ca45dc..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ActionHandler.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.handlers;
-
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.store.MetadataStore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-public class ActionHandler implements AutoCloseable {
- private static Logger LOG = LoggerFactory.getLogger(ActionHandler.class);
-
- protected final Configuration conf;
- protected final FileSystem fileSystem;
- protected final MetadataStore metadataStore;
-
- private final CompactionHandler compactionHandler;
-
- public ActionHandler(Configuration conf,
- MetadataStore metadataStore) throws IOException {
- this.conf = conf;
- this.fileSystem = FileSystem.get(conf);
- this.metadataStore = metadataStore;
- boolean cacheEnable = ServiceConfig.getInstance().getBool(ServiceConfig.ServiceConfVars.CompactionCacheEnable);
- this.compactionHandler = new CompactionHandler(cacheEnable);
- }
-
- public void scheduleCompaction(Instance instance) {
- compactionHandler.scheduleCompaction(metadataStore, instance);
- }
-
- public void removeCompaction(Instance instance) throws IOException {
- compactionHandler.removeCompaction(metadataStore, instance);
- }
-
- // TODO: support clustering
- public void scheduleClustering(Instance instance) {
-
- }
-
- public void removeClustering(Instance instance) {
-
- }
-
- @Override
- public void close() throws Exception {
- this.fileSystem.close();
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java
deleted file mode 100644
index 1afc973902..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/ClusteringHandler.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.handlers;
-
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.store.MetadataStore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * REST Handler servicing clustering requests.
- */
-public class ClusteringHandler {
-
- private static Logger LOG = LoggerFactory.getLogger(ClusteringHandler.class);
-
- public void scheduleClustering(MetadataStore metadataStore,
- Instance instance) {
- LOG.info("Start register compaction instance: " + instance.getIdentifier());
- metadataStore.saveInstance(instance);
- }
-
- public void removeClustering(MetadataStore metadataStore,
- Instance instance) {
- LOG.info("Start remove clustering instance: " + instance.getIdentifier());
- // 1. check instance exist
- Instance result = metadataStore.getInstance(instance);
- if (result == null) {
- throw new RuntimeException("Instance not exist: " + instance);
- }
- // 2. update status
- metadataStore.updateStatus(instance);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java
deleted file mode 100644
index 4edf962617..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/handlers/CompactionHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.handlers;
-
-import org.apache.hudi.table.management.common.ServiceContext;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.store.MetadataStore;
-
-import org.jetbrains.annotations.NotNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * REST Handler servicing compaction requests.
- */
-public class CompactionHandler {
- private static Logger LOG = LoggerFactory.getLogger(CompactionHandler.class);
- protected boolean cacheEnable;
-
- public CompactionHandler(boolean cacheEnable) {
- this.cacheEnable = cacheEnable;
- }
-
- public void scheduleCompaction(MetadataStore metadataStore,
- Instance instance) {
- String recordKey = instance.getRecordKey();
- LOG.info("Start register compaction instance: " + recordKey);
- if ((cacheEnable && ServiceContext.containsPendingInstant(recordKey))
- || metadataStore.getInstance(instance) != null) {
- LOG.warn("Instance has existed, instance: " + instance);
- } else {
- metadataStore.saveInstance(instance);
- }
- if (cacheEnable) {
- ServiceContext.refreshPendingInstant(recordKey);
- }
- }
-
- public void removeCompaction(@NotNull MetadataStore metadataStore,
- Instance instance) {
- LOG.info("Start remove compaction instance: " + instance.getIdentifier());
- // 1. check instance exist
- Instance result = metadataStore.getInstance(instance);
- if (result == null) {
- throw new RuntimeException("Instance not exist: " + instance);
- }
- // 2. update status
- metadataStore.updateStatus(instance);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java
deleted file mode 100644
index 5855535e36..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/BaseService.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.service;
-
-public interface BaseService {
-
- void init();
-
- void startService();
-
- void stop();
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java
deleted file mode 100644
index b792dfee75..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/CleanService.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.service;
-
-import org.apache.hudi.table.management.common.ServiceContext;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class CleanService implements BaseService {
-
- private static final Logger LOG = LoggerFactory.getLogger(CleanService.class);
- private ScheduledExecutorService service;
- private long cacheInterval = 3600 * 1000; //ms
-
- @Override
- public void init() {
- LOG.info("Init service: " + CleanService.class.getName());
- //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Clean-Service-%d").build();
- this.service = Executors.newSingleThreadScheduledExecutor();
- }
-
- @Override
- public void startService() {
- LOG.info("Start service: " + CleanService.class.getName());
- service.scheduleAtFixedRate(new RetryRunnable(), 30, 300, TimeUnit.SECONDS);
- }
-
- @Override
- public void stop() {
- LOG.info("Stop service: " + CleanService.class.getName());
- if (service != null && !service.isShutdown()) {
- service.shutdown();
- }
- }
-
- private class RetryRunnable implements Runnable {
-
- @Override
- public void run() {
- cleanCache();
- }
- }
-
- private void cleanCache() {
- long currentTime = System.currentTimeMillis();
- ConcurrentHashMap<String, Long> pendingInstances = ServiceContext.getPendingInstances();
- for (Map.Entry<String, Long> instance : pendingInstances.entrySet()) {
- if (currentTime - instance.getValue() > cacheInterval) {
- LOG.info("Instance has expired: " + instance.getKey());
- pendingInstances.remove(instance.getKey());
- }
- }
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java
deleted file mode 100644
index 919199a51f..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ExecutorService.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.service;
-
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.common.ServiceContext;
-import org.apache.hudi.table.management.executor.BaseActionExecutor;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class ExecutorService implements BaseService {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorService.class);
-
- private ThreadPoolExecutor executorService;
- private ScheduledExecutorService service;
- private BlockingQueue<BaseActionExecutor> taskQueue;
- private int coreExecuteSize;
- private int maxExecuteSize;
-
- public void init() {
- service = Executors.newSingleThreadScheduledExecutor();
- coreExecuteSize = ServiceConfig.getInstance()
- .getInt(ServiceConfig.ServiceConfVars.CoreExecuteSize);
- maxExecuteSize = ServiceConfig.getInstance()
- .getInt(ServiceConfig.ServiceConfVars.MaxExecuteSize);
- //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Executor-Service-%d").build();
- executorService = new ThreadPoolExecutor(coreExecuteSize, maxExecuteSize, 60,
- TimeUnit.SECONDS, new SynchronousQueue<>());
- taskQueue = new LinkedBlockingQueue<>();
- LOG.info("Init service: " + ExecutorService.class.getName() + ", coreExecuteSize: "
- + coreExecuteSize + ", maxExecuteSize: " + maxExecuteSize);
- }
-
- @Override
- public void startService() {
- LOG.info("Start service: " + ExecutorService.class.getName());
- service.submit(new ExecutionTask());
- }
-
- @Override
- public void stop() {
- LOG.info("Stop service: " + ExecutorService.class.getName());
- if (executorService != null && !executorService.isShutdown()) {
- executorService.shutdown();
- }
- if (service != null && service.isShutdown()) {
- service.shutdown();
- }
- LOG.info("Finish stop service: " + ExecutorService.class.getName());
- }
-
- private class ExecutionTask implements Runnable {
-
- @Override
- public void run() {
- while (true) {
- try {
- BaseActionExecutor executor = taskQueue.take();
- LOG.info("Start execute: " + executor);
- executorService.execute(executor);
- } catch (InterruptedException interruptedException) {
- LOG.error("Occur exception when exec job: " + interruptedException);
- }
- }
- }
- }
-
- public void submitTask(BaseActionExecutor task) {
- taskQueue.add(task);
- }
-
- public int getFreeSize() {
- return maxExecuteSize - ServiceContext.getRunningInstanceNum();
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java
deleted file mode 100644
index 164549a38b..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/MonitorService.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.service;
-
-import org.apache.hudi.table.management.common.ServiceContext;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class MonitorService implements BaseService {
-
- private static final Logger LOG = LoggerFactory.getLogger(MonitorService.class);
-
- private ScheduledExecutorService service;
-
- @Override
- public void init() {
- LOG.info("Init service: " + MonitorService.class);
- this.service = Executors.newSingleThreadScheduledExecutor();
- }
-
- @Override
- public void startService() {
- LOG.info("Start service: " + MonitorService.class.getName());
- service.scheduleAtFixedRate(new MonitorRunnable(), 30, 180, TimeUnit.SECONDS);
- }
-
- @Override
- public void stop() {
- LOG.info("Stop service: " + MonitorService.class.getName());
- if (service != null && !service.isShutdown()) {
- service.shutdown();
- }
- }
-
- private class MonitorRunnable implements Runnable {
-
- @Override
- public void run() {
- for (String info : ServiceContext.getRunningInstanceInfo()) {
- LOG.info(info);
- }
- }
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java
deleted file mode 100644
index 17bc3bb5df..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/RetryService.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.service;
-
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.entity.InstanceStatus;
-import org.apache.hudi.table.management.store.MetadataStore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class RetryService implements BaseService {
-
- private static final Logger LOG = LoggerFactory.getLogger(RetryService.class);
-
- private MetadataStore metadataStore;
- private ScheduledExecutorService service;
-
- public RetryService(MetadataStore metadataStore) {
- this.metadataStore = metadataStore;
- }
-
- @Override
- public void init() {
- LOG.info("Init service: " + RetryService.class.getName());
- //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Retry-Service-%d").build();
- this.service = Executors.newSingleThreadScheduledExecutor();
- }
-
- @Override
- public void startService() {
- LOG.info("Start service: " + RetryService.class.getName());
- service.scheduleAtFixedRate(new RetryRunnable(), 30, 180, TimeUnit.SECONDS);
- }
-
- @Override
- public void stop() {
- LOG.info("Stop service: " + RetryService.class.getName());
- if (service != null && !service.isShutdown()) {
- service.shutdown();
- }
- }
-
- private class RetryRunnable implements Runnable {
-
- @Override
- public void run() {
- submitFailTask();
- }
- }
-
- public void submitFailTask() {
- List<Instance> failInstances = metadataStore.getRetryInstances();
- for (Instance instance : failInstances) {
- LOG.info("Start retry instance: " + instance.getIdentifier());
- instance.setStatus(InstanceStatus.SCHEDULED.getStatus());
- metadataStore.updateStatus(instance);
- }
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java
deleted file mode 100644
index a7e1d708b5..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/service/ScheduleService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.service;
-
-import org.apache.hudi.table.management.common.ServiceConfig;
-import org.apache.hudi.table.management.entity.Action;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.entity.InstanceStatus;
-import org.apache.hudi.table.management.exception.HoodieTableManagementException;
-import org.apache.hudi.table.management.executor.BaseActionExecutor;
-import org.apache.hudi.table.management.executor.CompactionExecutor;
-import org.apache.hudi.table.management.store.MetadataStore;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class ScheduleService implements BaseService {
-
- private static final Logger LOG = LoggerFactory.getLogger(ScheduleService.class);
-
- private ScheduledExecutorService service;
- private ExecutorService executionService;
- private MetadataStore metadataStore;
- private int compactionWaitInterval;
-
- public ScheduleService(ExecutorService executionService,
- MetadataStore metadataStore) {
- this.executionService = executionService;
- this.metadataStore = metadataStore;
- this.compactionWaitInterval = ServiceConfig.getInstance()
- .getInt(ServiceConfig.ServiceConfVars.CompactionScheduleWaitInterval);
- }
-
- @Override
- public void init() {
- LOG.info("Finish init schedule service, compactionWaitInterval: " + compactionWaitInterval);
- //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Schedule-Service-%d").build();
- this.service = Executors.newSingleThreadScheduledExecutor();
- }
-
- @Override
- public void startService() {
- LOG.info("Start service: " + ScheduleService.class.getName());
- service.scheduleAtFixedRate(new ScheduleRunnable(), 30, 60, TimeUnit.SECONDS);
- }
-
- @Override
- public void stop() {
- LOG.info("Stop service: " + ScheduleService.class.getName());
- if (service != null && !service.isShutdown()) {
- service.shutdown();
- }
- }
-
- private class ScheduleRunnable implements Runnable {
-
- @Override
- public void run() {
- submitReadyTask();
- }
- }
-
- public void submitReadyTask() {
- int limitSize = executionService.getFreeSize();
- LOG.info("Start get ready instances, limitSize: " + limitSize);
- if (limitSize > 0) {
- List<Instance> readyInstances = metadataStore.getInstances(
- InstanceStatus.SCHEDULED.getStatus(), limitSize);
- for (Instance readyInstance : readyInstances) {
- if (waitSchedule(readyInstance)) {
- LOG.info("Instance should wait schedule: " + readyInstance.getInstanceRunStatus());
- continue;
- }
- LOG.info("Schedule ready instances: " + readyInstance.getInstanceRunStatus());
- BaseActionExecutor executor = getActionExecutor(readyInstance);
- executionService.submitTask(executor);
- }
- }
- }
-
- private boolean waitSchedule(Instance instance) {
- return instance.getAction() == Action.COMPACTION.getValue()
- && instance.getUpdateTime().getTime() + compactionWaitInterval
- > System.currentTimeMillis();
- }
-
- protected BaseActionExecutor getActionExecutor(Instance instance) {
- if (instance.getAction() == Action.COMPACTION.getValue()) {
- return new CompactionExecutor(instance);
- } else {
- throw new HoodieTableManagementException("Unsupported action " + instance.getAction());
- }
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java
deleted file mode 100644
index 8d730212cc..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/MetadataStore.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.store;
-
-import org.apache.hudi.table.management.entity.AssistQueryEntity;
-import org.apache.hudi.table.management.entity.Instance;
-
-import java.util.List;
-
-public interface MetadataStore {
-
- void saveInstance(Instance instance);
-
- void updateStatus(Instance instance);
-
- void init();
-
- Instance getInstance(Instance instance);
-
- List<Instance> getInstances(int status, int limit);
-
- List<Instance> getRetryInstances();
-
- List<Instance> getAlertInstances(AssistQueryEntity queryEntity);
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java
deleted file mode 100644
index ac42fe92aa..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/impl/RelationDBBasedStore.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.store.impl;
-
-import org.apache.hudi.table.management.entity.AssistQueryEntity;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.store.MetadataStore;
-import org.apache.hudi.table.management.store.jdbc.InstanceDao;
-
-import java.util.List;
-
-public class RelationDBBasedStore implements MetadataStore {
-
- private final InstanceDao instanceDao;
-
- public RelationDBBasedStore() {
- this.instanceDao = new InstanceDao();
- }
-
- @Override
- public void saveInstance(Instance instance) {
- instanceDao.saveInstance(instance);
- }
-
- @Override
- public void updateStatus(Instance instance) {
- instanceDao.updateStatus(instance);
- }
-
- @Override
- public void init() {
- // do nothing
- }
-
- @Override
- public Instance getInstance(Instance instance) {
- return instanceDao.getInstance(instance);
- }
-
- @Override
- public List<Instance> getInstances(int status, int limit) {
- return instanceDao.getInstances(status, limit);
- }
-
- @Override
- public List<Instance> getRetryInstances() {
- return instanceDao.getRetryInstances();
- }
-
- @Override
- public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) {
- return instanceDao.getAlertInstances(queryEntity);
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java
deleted file mode 100644
index e43fc21b8c..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/HikariDataSourceFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.store.jdbc;
-
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory;
-import org.apache.ibatis.io.Resources;
-
-import java.io.IOException;
-import java.util.Properties;
-
-public class HikariDataSourceFactory extends UnpooledDataSourceFactory {
- private static final String PROPERTIES_PATH = "hikariPool.properties";
-
- public HikariDataSourceFactory() throws IOException {
- Properties properties = new Properties();
- properties.load(Resources.getResourceAsStream(PROPERTIES_PATH));
- HikariConfig config = new HikariConfig(properties);
- this.dataSource = new HikariDataSource(config);
- }
-}
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java
deleted file mode 100644
index 4c106138f7..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/InstanceDao.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.store.jdbc;
-
-import org.apache.hudi.table.management.entity.AssistQueryEntity;
-import org.apache.hudi.table.management.entity.Instance;
-import org.apache.hudi.table.management.entity.InstanceStatus;
-
-import org.apache.ibatis.session.RowBounds;
-import org.apache.ibatis.session.SqlSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class InstanceDao {
-
- private static Logger LOG = LoggerFactory.getLogger(InstanceDao.class);
-
- private static final String NAMESPACE = "Instance";
-
- public void saveInstance(Instance instance) {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- sqlSession.insert(statement(NAMESPACE, "saveInstance"), instance);
- sqlSession.commit();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void updateStatus(Instance instance) {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- int ret = sqlSession.update(statement(NAMESPACE, getUpdateStatusSqlId(instance)), instance);
- sqlSession.commit();
- if (ret != 1) {
- LOG.error("Fail update status instance: " + instance);
- throw new RuntimeException("Fail update status instance: " + instance.getIdentifier());
- }
- LOG.info("Success update status instance: " + instance.getIdentifier());
- } catch (Exception e) {
- LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e);
- throw new RuntimeException(e);
- }
- }
-
- public void updateExecutionInfo(Instance instance) {
- int retryNum = 0;
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- while (retryNum++ < 3) {
- int ret = sqlSession.update(statement(NAMESPACE, "updateExecutionInfo"), instance);
- sqlSession.commit();
- if (ret != 1) {
- LOG.warn("Fail update execution info instance: " + instance);
- TimeUnit.SECONDS.sleep(5);
- } else {
- LOG.info("Success update execution info, instance: " + instance.getIdentifier());
- return;
- }
- }
- throw new RuntimeException("Fail update execution info: " + instance.getIdentifier());
- } catch (Exception e) {
- LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e);
- throw new RuntimeException(e);
- }
- }
-
- public Instance getInstance(Instance instance) {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- return sqlSession.selectOne(statement(NAMESPACE, "getInstance"), instance);
- } catch (Exception e) {
- LOG.error("Fail get Instance: " + instance.getIdentifier() + ", errMsg: ", e);
- throw new RuntimeException(e);
- }
- }
-
- private String getUpdateStatusSqlId(Instance instance) {
- switch (InstanceStatus.getInstance(instance.getStatus())) {
- case SCHEDULED:
- return "retryInstance";
- case RUNNING:
- return "runningInstance";
- case COMPLETED:
- return "successInstance";
- case FAILED:
- return "failInstance";
- case INVALID:
- return "invalidInstance";
- default:
- throw new RuntimeException("Invalid instance: " + instance.getIdentifier());
- }
- }
-
- public List<Instance> getInstances(int status, int limit) {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- if (limit > 0) {
- return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status,
- new RowBounds(0, limit));
- } else {
- return sqlSession.selectList(statement(NAMESPACE, "getInstances"), status);
- }
- } catch (Exception e) {
- LOG.error("Fail get instances, status: " + status + ", errMsg: ", e);
- throw new RuntimeException("Fail get instances, status: " + status);
- }
- }
-
- public List<Instance> getRetryInstances() {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- return sqlSession.selectList(statement(NAMESPACE, "getRetryInstances"),
- new AssistQueryEntity());
- } catch (Exception e) {
- LOG.error("Fail get retry instances, errMsg: ", e);
- throw new RuntimeException("Fail get retry instances");
- }
- }
-
- public List<Instance> getAlertInstances(AssistQueryEntity queryEntity) {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- return sqlSession.selectList(statement(NAMESPACE, "getAlertInstances"),
- queryEntity);
- } catch (Exception e) {
- LOG.error("Fail get alert instances, errMsg: ", e);
- throw new RuntimeException("Fail get alert instances");
- }
- }
-
- public List<Instance> getInstanceAfterTime(AssistQueryEntity queryEntity) {
- try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) {
- return sqlSession.selectList(statement(NAMESPACE, "getInstanceAfterTime"), queryEntity);
- } catch (Exception e) {
- LOG.error("Fail get instances after time, errMsg: ", e);
- throw new RuntimeException("Fail get alert instances");
- }
- }
-
- private String statement(String namespace, String sqlID) {
- return namespace + "." + sqlID;
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java
deleted file mode 100644
index 4dea08332c..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/store/jdbc/SqlSessionFactoryUtil.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.store.jdbc;
-
-import org.apache.hudi.table.management.exception.HoodieTableManagementException;
-
-import org.apache.ibatis.io.Resources;
-import org.apache.ibatis.session.SqlSession;
-import org.apache.ibatis.session.SqlSessionFactory;
-import org.apache.ibatis.session.SqlSessionFactoryBuilder;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.sql.PreparedStatement;
-import java.util.stream.Collectors;
-
-public class SqlSessionFactoryUtil {
-
- private static final String CONFIG_PATH = "mybatis-config.xml";
-
- private static SqlSessionFactory sqlSessionFactory;
- private static final Class<?> CLASS_LOCK = SqlSessionFactoryUtil.class;
-
- private SqlSessionFactoryUtil() {
-
- }
-
- public static void initSqlSessionFactory() {
- try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) {
- synchronized (CLASS_LOCK) {
- if (sqlSessionFactory == null) {
- sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public static SqlSession openSqlSession() {
- if (sqlSessionFactory == null) {
- initSqlSessionFactory();
- init();
- }
- return sqlSessionFactory.openSession();
- }
-
- public static void init() {
- try {
- String[] ddls = org.apache.commons.io.IOUtils.readLines(
- SqlSessionFactoryUtil.class.getResourceAsStream("/table-management-service.sql"))
- .stream().filter(e -> !e.startsWith("--"))
- .collect(Collectors.joining(""))
- .split(";");
- for (String ddl : ddls) {
- try (PreparedStatement statement = SqlSessionFactoryUtil.openSqlSession().getConnection()
- .prepareStatement(ddl)) {
- statement.execute();
- }
- }
- } catch (Exception e) {
- throw new HoodieTableManagementException("Unable to read init ddl file", e);
- }
- }
-
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java
deleted file mode 100644
index 763047a26e..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/DateTimeUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.util;
-
-import java.util.Calendar;
-import java.util.Date;
-
-public class DateTimeUtils {
-
- public static Date addDay(int amount) {
- Calendar c = Calendar.getInstance();
- c.setTime(new Date());
- c.add(Calendar.DATE, amount);
- return c.getTime();
- }
-}
diff --git a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java b/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java
deleted file mode 100644
index 27139bf6f5..0000000000
--- a/hudi-table-management-service/src/main/java/org/apache/hudi/table/management/util/InstanceUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.management.util;
-
-import org.apache.hudi.table.management.entity.Action;
-import org.apache.hudi.table.management.entity.Engine;
-import org.apache.hudi.table.management.entity.Instance;
-
-public class InstanceUtil {
-
- public static void checkArgument(Instance instance) {
- if (instance.getExecutionEngine() == null) {
- instance.setExecutionEngine(Engine.SPARK);
- }
- Engine.checkEngineType(instance);
- Action.checkActionType(instance);
- }
-}
diff --git a/hudi-table-management-service/src/main/resources/hikariPool.properties b/hudi-table-management-service/src/main/resources/hikariPool.properties
deleted file mode 100644
index a14104a127..0000000000
--- a/hudi-table-management-service/src/main/resources/hikariPool.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-###
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-###
-jdbcUrl=jdbc:h2:mem:tms;MODE=MYSQL
-dataSource.user=root
-dataSource.password=password
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/resources/logback.xml b/hudi-table-management-service/src/main/resources/logback.xml
deleted file mode 100644
index f4d55b0ab7..0000000000
--- a/hudi-table-management-service/src/main/resources/logback.xml
+++ /dev/null
@@ -1,41 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<configuration>
-
- <include resource="com/bytedance/logback/agent/base.xml"/>
- <include resource="com/bytedance/logback/agent/agent-appender.xml"/>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
- <layout class="ch.qos.logback.classic.PatternLayout">
- <pattern>
- %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n
- </pattern>
- </layout>
- </encoder>
- </appender>
-
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- <appender-ref ref="FILE"/>
- <appender-ref ref="AGENT"/>
- </root>
-
- <logger name="org.apache.spark" level="WARN"/>
-
-</configuration>
\ No newline at end of file
diff --git a/hudi-table-management-service/src/main/resources/mybatis-config.xml b/hudi-table-management-service/src/main/resources/mybatis-config.xml
deleted file mode 100644
index d9b6fc581b..0000000000
--- a/hudi-table-management-service/src/main/resources/mybatis-config.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd">
-<configuration>
-
- <settings>
- <setting name="lazyLoadingEnabled" value="false" />
- <setting name="callSettersOnNulls" value="true"/>
- <setting name="logImpl" value="STDOUT_LOGGING" />
- </settings>
-
- <typeAliases>
-
- </typeAliases>
-
- <environments default="development">
- <environment id="development">
- <transactionManager type="JDBC"/>
- <dataSource type="org.apache.hudi.table.management.store.jdbc.HikariDataSourceFactory"/>
- </environment>
- </environments>
-
- <mappers>
- <mapper resource="mybatis/Instance.xml"/>
- </mappers>
-
-</configuration>
diff --git a/hudi-table-management-service/src/main/resources/mybatis/Instance.xml b/hudi-table-management-service/src/main/resources/mybatis/Instance.xml
deleted file mode 100644
index c0d5d86d70..0000000000
--- a/hudi-table-management-service/src/main/resources/mybatis/Instance.xml
+++ /dev/null
@@ -1,165 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
- "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-
-<mapper namespace="Instance">
-
- <resultMap type="org.apache.hudi.table.management.entity.Instance" id="InstanceMapping">
- <result column="id" property="id" javaType="java.lang.Long"/>
- <result column="db_name" property="dbName"/>
- <result column="table_name" property="tableName"/>
- <result column="base_path" property="basePath"/>
- <result column="execution_engine" property="executionEngine"/>
- <result column="owner" property="owner"/>
- <result column="cluster" property="cluster"/>
- <result column="queue" property="queue"/>
- <result column="resource" property="resource"/>
- <result column="parallelism" property="parallelism"/>
- <result column="auto_clean" property="autoClean"/>
- <result column="instant" property="instant"/>
- <result column="action" property="action" javaType="java.lang.Integer"/>
- <result column="status" property="status" javaType="java.lang.Integer"/>
- <result column="run_times" property="runTimes" javaType="java.lang.Integer"/>
- <result column="application_id" property="applicationId"/>
- <result column="dorado_job_id" property="doradoJobId"/>
- <result column="schedule_time" property="scheduleTime" javaType="java.util.Date"/>
- <result column="create_time" property="createTime" javaType="java.util.Date"/>
- <result column="update_time" property="updateTime" javaType="java.util.Date"/>
- </resultMap>
-
- <sql id="selectColumns">
- id, db_name, table_name, base_path, execution_engine, owner, cluster, queue, resource, parallelism, auto_clean,
- instant, action, status, run_times, application_id, dorado_job_id, schedule_time, create_time, update_time
- </sql>
-
- <insert id="saveInstance"
- parameterType="org.apache.hudi.table.management.entity.Instance"
- useGeneratedKeys="true" keyProperty="id">
- INSERT INTO instance (db_name, table_name, base_path, execution_engine, owner, cluster,
- queue, resource, parallelism, auto_clean, instant, action, status, run_times)
- VALUES (#{dbName}, #{tableName}, #{basePath}, #{executionEngine}, #{owner}, #{cluster},
- #{queue},#{resource}, #{parallelism}, #{autoClean}, #{instant}, #{action}, #{status}, 0)
- </insert>
-
- <update id="runningInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
- UPDATE instance
- SET status = #{status},
- schedule_time = now(),
- run_times = run_times + 1
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- and status = 0
- </update>
-
- <update id="retryInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
- UPDATE instance
- SET status = #{status}
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- </update>
-
- <update id="updateExecutionInfo"
- parameterType="org.apache.hudi.table.management.entity.Instance">
- UPDATE instance
- SET dorado_job_id = #{doradoJobId},
- application_id = #{applicationId}
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- </update>
-
- <update id="successInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
- UPDATE instance
- SET status = #{status}
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- and status = 1
- </update>
-
- <update id="failInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
- UPDATE instance
- SET status = #{status}
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- and status = 1
- </update>
-
- <update id="invalidInstance" parameterType="org.apache.hudi.table.management.entity.Instance">
- UPDATE instance
- SET status = #{status}
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- </update>
-
- <select id="getInstance" parameterType="org.apache.hudi.table.management.entity.Instance"
- resultMap="InstanceMapping">
- SELECT <include refid="selectColumns"/>
- FROM instance
- WHERE db_name = #{dbName}
- and table_name = #{tableName}
- and instant = #{instant}
- </select>
-
- <select id="getInstances" parameterType="java.lang.Integer"
- resultMap="InstanceMapping">
- SELECT
- <include refid="selectColumns"/>
- FROM instance
- WHERE status = #{status}
- order by id
- </select>
-
- <select id="getRetryInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
- resultMap="InstanceMapping">
- SELECT
- <include refid="selectColumns"/>
- FROM instance
- WHERE status = 2
- and run_times <![CDATA[ <= ]]> #{maxRetry}
- and update_time > #{queryStartTime}
- order by id
- </select>
-
- <select id="getAlertInstances" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
- resultMap="InstanceMapping">
- SELECT
- <include refid="selectColumns"/>
- FROM instance
- WHERE status = #{status}
- and run_times > #{maxRetry}
- and update_time > #{queryStartTime}
- order by id
- </select>
-
- <select id="getInstanceAfterTime" parameterType="org.apache.hudi.table.management.entity.AssistQueryEntity"
- resultMap="InstanceMapping">
- SELECT
- <include refid="selectColumns"/>
- FROM instance
- WHERE status = #{status}
- and update_time > #{queryStartTime}
- order by id
- </select>
-
-</mapper>
diff --git a/hudi-table-management-service/src/main/resources/table-management-service.sql b/hudi-table-management-service/src/main/resources/table-management-service.sql
deleted file mode 100644
index 243880b2d9..0000000000
--- a/hudi-table-management-service/src/main/resources/table-management-service.sql
+++ /dev/null
@@ -1,46 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements. See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership. The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License. You may obtain a copy of the License at
---
--- http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-CREATE TABLE if not exists `instance`
-(
- `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key',
- `db_name` varchar(128) NOT NULL COMMENT 'db name',
- `table_name` varchar(128) NOT NULL COMMENT 'table name',
- `base_path` varchar(128) NOT NULL COMMENT 'base path',
- `execution_engine` varchar(128) NOT NULL COMMENT 'execution engine',
- `owner` varchar(128) NOT NULL COMMENT 'owner',
- `cluster` varchar(128) NOT NULL COMMENT 'cluster',
- `queue` varchar(128) NOT NULL COMMENT 'queue',
- `resource` varchar(128) NOT NULL COMMENT 'resource',
- `parallelism` varchar(128) NOT NULL COMMENT 'parallelism',
- `auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean',
- `instant` varchar(128) NOT NULL COMMENT 'instant',
- `action` int NOT NULL COMMENT 'action',
- `status` int NOT NULL COMMENT 'status',
- `run_times` int NOT NULL DEFAULT '0' COMMENT 'run times',
- `application_id` varchar(128) DEFAULT NULL COMMENT 'application id',
- `dorado_job_id` varchar(128) DEFAULT NULL COMMENT 'job id',
- `schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
- `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
- PRIMARY KEY (`id`),
- UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`),
- KEY `idx_status` (`status`),
- KEY `idx_update_time_status` (`update_time`,`status`)
-) COMMENT='Table Management Service instance';
-
diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties b/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties
deleted file mode 100644
index b21b5d4070..0000000000
--- a/hudi-table-management-service/src/test/resources/log4j-surefire-quiet.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-###
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-###
-log4j.rootLogger=WARN, CONSOLE
-log4j.logger.org.apache.hudi=DEBUG
-
-# CONSOLE is set to be a ConsoleAppender.
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-# CONSOLE uses PatternLayout.
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n
-log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
-log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
-log4j.appender.CONSOLE.filter.a.LevelMin=WARN
-log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
diff --git a/hudi-table-management-service/src/test/resources/log4j-surefire.properties b/hudi-table-management-service/src/test/resources/log4j-surefire.properties
deleted file mode 100644
index c03e808cca..0000000000
--- a/hudi-table-management-service/src/test/resources/log4j-surefire.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-###
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-###
-log4j.rootLogger=WARN, CONSOLE
-log4j.logger.org.apache=INFO
-log4j.logger.org.apache.hudi=DEBUG
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-# A1 uses PatternLayout.
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
-log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
-log4j.appender.CONSOLE.filter.a.LevelMin=WARN
-log4j.appender.CONSOLE.filter.a.LevelMax=FATAL
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 96a86a0bb0..e3c8b3e8c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,6 @@
<module>hudi-hadoop-mr</module>
<module>hudi-spark-datasource</module>
<module>hudi-timeline-service</module>
- <module>hudi-table-management-service</module>
<module>hudi-utilities</module>
<module>hudi-sync</module>
<module>packaging/hudi-hadoop-mr-bundle</module>