You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/31 08:06:11 UTC
[incubator-inlong] branch master updated: [INLONG-4301][Manager] Support Oracle source (#4396)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 015630344 [INLONG-4301][Manager] Support Oracle source (#4396)
015630344 is described below
commit 0156303447995ccaa2576935b23261ae3539922b
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Tue May 31 16:06:05 2022 +0800
[INLONG-4301][Manager] Support Oracle source (#4396)
---
.../apache/inlong/common/enums/TaskTypeEnum.java | 24 +++--
.../inlong/manager/common/enums/SourceType.java | 4 +-
.../common/pojo/source/oracle/OracleSource.java | 78 ++++++++++++++++
.../common/pojo/source/oracle/OracleSourceDTO.java | 96 +++++++++++++++++++
.../source/oracle/OracleSourceListResponse.java | 69 ++++++++++++++
.../pojo/source/oracle/OracleSourceRequest.java | 73 +++++++++++++++
.../service/sort/util/ExtractNodeUtils.java | 49 +++++++++-
.../source/oracle/OracleSourceOperation.java | 101 ++++++++++++++++++++
.../core/source/OracleSourceServiceTest.java | 102 +++++++++++++++++++++
.../sort/protocol/constant/OracleConstant.java | 9 ++
10 files changed, 596 insertions(+), 9 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 6cb11d120..594ec3d17 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -17,19 +17,26 @@
package org.apache.inlong.common.enums;
-import static java.util.Objects.requireNonNull;
-
public enum TaskTypeEnum {
- DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5), POSTGRES(6);
- private int type;
+ DATABASE_MIGRATION(0),
+ SQL(1),
+ BINLOG(2),
+ FILE(3),
+ KAFKA(4),
+ PULSAR(5),
+ POSTGRES(6),
+ ORACLE(7),
+
+ ;
+
+ private final int type;
TaskTypeEnum(int type) {
this.type = type;
}
public static TaskTypeEnum getTaskType(int taskType) {
- requireNonNull(taskType);
switch (taskType) {
case 0:
return DATABASE_MIGRATION;
@@ -45,12 +52,15 @@ public enum TaskTypeEnum {
return PULSAR;
case 6:
return POSTGRES;
+ case 7:
+ return ORACLE;
default:
- throw new RuntimeException("such task type doesn't exist");
+ throw new RuntimeException(String.format("Unsupported taskType=%s", taskType));
}
}
public int getType() {
return type;
}
-}
\ No newline at end of file
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 848d978a5..9f0fd2997 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -33,7 +33,8 @@ public enum SourceType {
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
KAFKA("KAFKA", TaskTypeEnum.KAFKA),
PULSAR("PULSAR", TaskTypeEnum.PULSAR),
- POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES);
+ POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES),
+ ORACLE("ORACLE", TaskTypeEnum.ORACLE);
public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
public static final String SOURCE_FILE = "FILE";
@@ -42,6 +43,7 @@ public enum SourceType {
public static final String SOURCE_KAFKA = "KAFKA";
public static final String SOURCE_PULSAR = "PULSAR";
public static final String SOURCE_POSTGRES = "POSTGRES";
+ public static final String SOURCE_ORACLE = "ORACLE";
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSource.java
new file mode 100644
index 000000000..34f24daf5
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.oracle;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+
+/**
+ * Oracle source info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Oracle source info")
+public class OracleSource extends StreamSource {
+
+ @ApiModelProperty("Hostname of the DB server, for example: 127.0.0.1")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private Integer port = 1521;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Schema name")
+ private String schemaName;
+
+ @ApiModelProperty("table name")
+ private String tableName;
+
+ @ApiModelProperty("Scan startup mode")
+ private String scanStartupMode;
+
+ @ApiModelProperty(value = "Primary key must be shared by all tables")
+ private String primaryKey;
+
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration = false;
+
+ public OracleSource() {
+ this.setSourceType(SourceType.ORACLE.name());
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this, OracleSourceRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
new file mode 100644
index 000000000..2b125b7c9
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceDTO.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.oracle;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Oracle source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OracleSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("Hostname of the DB server, for example: 127.0.0.1")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private Integer port = 1521;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Schema name")
+ private String schemaName;
+
+ @ApiModelProperty("table name")
+ private String tableName;
+
+ @ApiModelProperty("Scan startup mode")
+ private String scanStartupMode;
+
+ @ApiModelProperty(value = "Primary key must be shared by all tables")
+ private String primaryKey;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static OracleSourceDTO getFromRequest(OracleSourceRequest request) {
+ return OracleSourceDTO.builder()
+ .database(request.getDatabase())
+ .hostname(request.getHostname())
+ .port(request.getPort())
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .schemaName(request.getSchemaName())
+ .tableName(request.getTableName())
+ .primaryKey(request.getPrimaryKey())
+ .scanStartupMode(request.getScanStartupMode())
+ .build();
+ }
+
+ public static OracleSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, OracleSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceListResponse.java
new file mode 100644
index 000000000..0bbb3b737
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceListResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.oracle;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+/**
+ * Response info of oracle source list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of oracle source paging list")
+public class OracleSourceListResponse extends SourceListResponse {
+
+ @ApiModelProperty("Hostname of the DB server, for example: 127.0.0.1")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private Integer port = 1521;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Schema name")
+ private String schemaName;
+
+ @ApiModelProperty("table name")
+ private String tableName;
+
+ @ApiModelProperty("Scan startup mode")
+ private String scanStartupMode;
+
+ @ApiModelProperty(value = "Primary key must be shared by all tables")
+ private String primaryKey;
+
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration = false;
+
+ public OracleSourceListResponse() {
+ this.setSourceType(SourceType.ORACLE.getType());
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceRequest.java
new file mode 100644
index 000000000..e37e0b348
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/oracle/OracleSourceRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.oracle;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of oracle source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the oracle source info")
+@JsonTypeDefine(value = SourceType.SOURCE_ORACLE)
+public class OracleSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("Hostname of the DB server, for example: 127.0.0.1")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private Integer port = 1521;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Database name")
+ private String database;
+
+ @ApiModelProperty("Schema name")
+ private String schemaName;
+
+ @ApiModelProperty("table name")
+ private String tableName;
+
+ @ApiModelProperty("Scan startup mode")
+ private String scanStartupMode;
+
+ @ApiModelProperty(value = "Primary key must be shared by all tables")
+ private String primaryKey;
+
+ @ApiModelProperty("Need transfer total database")
+ private boolean allMigration = false;
+
+ public OracleSourceRequest() {
+ this.setSourceType(SourceType.ORACLE.toString());
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 9447b156a..9c2ddf00a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -29,15 +29,18 @@ import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSource;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
@@ -80,6 +83,8 @@ public class ExtractNodeUtils {
return createExtractNode((PulsarSource) sourceInfo);
case POSTGRES:
return createExtractNode((PostgresSource) sourceInfo);
+ case ORACLE:
+ return createExtractNode((OracleSource) sourceInfo);
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -239,7 +244,8 @@ public class ExtractNodeUtils {
format = new DebeziumJsonFormat();
break;
default:
- throw new IllegalArgumentException(String.format("Unsupported dataType=%s for kafka source", dataType));
+ throw new IllegalArgumentException(
+ String.format("Unsupported dataType=%s for pulsar source", dataType));
}
if (pulsarSource.isInlongComponent()) {
Format innerFormat = format;
@@ -283,4 +289,45 @@ public class ExtractNodeUtils {
postgresSource.getSchema(), postgresSource.getPort(),
postgresSource.getDecodingPluginName());
}
+
+ /**
+ * Create oracleExtractNode based on OracleSourceResponse
+ *
+ * @param oracleSource oracle source response info
+ * @return oracle extract node info
+ */
+ public static OracleExtractNode createExtractNode(OracleSource oracleSource) {
+ final String id = oracleSource.getSourceName();
+ final String name = oracleSource.getSourceName();
+ final String database = oracleSource.getDatabase();
+ final String schemaName = oracleSource.getSchemaName();
+ final String tableName = oracleSource.getTableName();
+ final String primaryKey = oracleSource.getPrimaryKey();
+ final String hostName = oracleSource.getHostname();
+ final String userName = oracleSource.getUsername();
+ final String password = oracleSource.getPassword();
+ final Integer port = oracleSource.getPort();
+ ScanStartUpMode scanStartupMode = StringUtils.isBlank(oracleSource.getScanStartupMode())
+ ? null : ScanStartUpMode.forName(oracleSource.getScanStartupMode());
+ List<StreamField> streamFieldInfos = oracleSource.getFieldList();
+ final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
+ .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+ .collect(Collectors.toList());
+ Map<String, String> properties = Maps.newHashMap();
+ return new OracleExtractNode(id,
+ name,
+ fieldInfos,
+ null,
+ properties,
+ primaryKey,
+ hostName,
+ userName,
+ password,
+ database,
+ schemaName,
+ tableName,
+ port,
+ scanStartupMode
+ );
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperation.java
new file mode 100644
index 000000000..8d23d089f
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oracle/OracleSourceOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.oracle;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSourceRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * Oracle source operation
+ */
+@Service
+public class OracleSourceOperation extends AbstractSourceOperation {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(SourceType sourceType) {
+ return SourceType.ORACLE == sourceType;
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.ORACLE.getType();
+ }
+
+ @Override
+ protected StreamSource getSource() {
+ return new OracleSource();
+ }
+
+ @Override
+ public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+ if (CollectionUtils.isEmpty(entityPage)) {
+ return new PageInfo<>();
+ }
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity, OracleSourceListResponse::new));
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+ OracleSourceRequest sourceRequest = (OracleSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ OracleSourceDTO dto = OracleSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSourceType();
+ Preconditions.checkTrue(getSourceType().equals(existType),
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+ OracleSourceDTO dto = OracleSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+ return result;
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
new file mode 100644
index 000000000..c467001a8
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.source;
+
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
+import org.apache.inlong.manager.common.pojo.source.oracle.OracleSourceRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Oracle source service test
+ */
+public class OracleSourceServiceTest extends ServiceBaseTest {
+
+ private final String sourceName = "stream_source_service_test";
+ private static final String hostname = "127.0.0.1";
+ private static final Integer port = 1521;
+ private static final String database = "oracle_database";
+ private static final String schema = "oracle_schema";
+ private static final String tablename = "oracle_tablename";
+
+ @Autowired
+ private StreamSourceService sourceService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+
+ /**
+ * Save source info.
+ */
+ public Integer saveSource() {
+ streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
+
+ OracleSourceRequest sourceInfo = new OracleSourceRequest();
+ sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+ sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
+ sourceInfo.setSourceName(sourceName);
+ sourceInfo.setSourceType(SourceType.ORACLE.getType());
+ sourceInfo.setHostname(hostname);
+ sourceInfo.setDatabase(database);
+ sourceInfo.setTableName(tablename);
+ sourceInfo.setPort(port);
+ sourceInfo.setSchemaName(schema);
+ return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
+ }
+
+ @Test
+ public void testSaveAndDelete() {
+ Integer id = this.saveSource();
+ Assert.assertNotNull(id);
+
+ boolean result = sourceService.delete(id, GLOBAL_OPERATOR);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer id = this.saveSource();
+
+ StreamSource source = sourceService.get(id);
+ Assert.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
+
+ sourceService.delete(id, GLOBAL_OPERATOR);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer id = this.saveSource();
+ StreamSource response = sourceService.get(id);
+ Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
+
+ OracleSource oracleSource = (OracleSource) response;
+
+ OracleSourceRequest request = CommonBeanUtils.copyProperties(oracleSource, OracleSourceRequest::new);
+ boolean result = sourceService.update(request, GLOBAL_OPERATOR);
+ Assert.assertTrue(result);
+
+ sourceService.delete(id, GLOBAL_OPERATOR);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/OracleConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/OracleConstant.java
index f8143e9ba..0ac9fcf8c 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/OracleConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/OracleConstant.java
@@ -111,6 +111,15 @@ public class OracleConstant {
ScanStartUpMode(String value) {
this.value = value;
}
+
+ public static ScanStartUpMode forName(String name) {
+ for (ScanStartUpMode dataType : ScanStartUpMode.values()) {
+ if (dataType.getValue().equals(name)) {
+ return dataType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport ScanStartUpMode for oracle source:%s", name));
+ }
}
}