You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/02 08:48:37 UTC
[incubator-inlong] branch master updated: [INLONG-4273][Manager] Support MongoDB source (#4294)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4d2f528 [INLONG-4273][Manager] Support MongoDB source (#4294)
4c4d2f528 is described below
commit 4c4d2f528b64ddc2a95d041a26d8bd59662b6966
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Thu Jun 2 16:48:31 2022 +0800
[INLONG-4273][Manager] Support MongoDB source (#4294)
* [INLONG-4273][Manager] Support MongoDB source
* [INLONG-4273][Manager] Refactor MongoDB source codes
Co-authored-by: healchow <he...@gmail.com>
---
.../apache/inlong/common/enums/TaskTypeEnum.java | 4 +
.../inlong/manager/common/enums/SourceType.java | 6 +-
.../common/pojo/source/mongodb/MongoDBSource.java | 72 ++++++++++++
.../pojo/source/mongodb/MongoDBSourceDTO.java | 87 +++++++++++++++
.../source/mongodb/MongoDBSourceListResponse.java | 63 +++++++++++
.../pojo/source/mongodb/MongoDBSourceRequest.java | 61 +++++++++++
.../service/sort/util/ExtractNodeUtils.java | 122 ++++++++++++---------
.../source/mongo/MongoDBSourceOperation.java | 101 +++++++++++++++++
...viceTest.java => MongoDBSourceServiceTest.java} | 33 +++---
.../core/source/OracleSourceServiceTest.java | 9 +-
.../core/source/SqlServerSourceServiceTest.java | 8 +-
11 files changed, 482 insertions(+), 84 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 0cb210be9..e59f6805e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -28,6 +28,8 @@ public enum TaskTypeEnum {
POSTGRES(6),
ORACLE(7),
SQLSERVER(8),
+ MONGODB(9),
+
;
@@ -57,6 +59,8 @@ public enum TaskTypeEnum {
return ORACLE;
case 8:
return SQLSERVER;
+ case 9:
+ return MONGODB;
default:
throw new RuntimeException(String.format("Unsupported taskType=%s", taskType));
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index f5f244aa7..200cc6be6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -35,7 +35,10 @@ public enum SourceType {
PULSAR("PULSAR", TaskTypeEnum.PULSAR),
POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES),
ORACLE("ORACLE", TaskTypeEnum.ORACLE),
- SQLSERVER("SQLSERVER", TaskTypeEnum.SQLSERVER);
+ SQLSERVER("SQLSERVER", TaskTypeEnum.SQLSERVER),
+ MONGODB("MONGO", TaskTypeEnum.MONGODB),
+
+ ;
public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
public static final String SOURCE_FILE = "FILE";
@@ -46,6 +49,7 @@ public enum SourceType {
public static final String SOURCE_POSTGRES = "POSTGRES";
public static final String SOURCE_ORACLE = "ORACLE";
public static final String SOURCE_SQLSERVER = "SQLSERVER";
+ public static final String SOURCE_MONGODB = "MONGODB";
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSource.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSource.java
new file mode 100644
index 000000000..45c4b790c
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSource.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * MongoDB source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "MongoDB source info")
+@JsonTypeDefine(value = SourceType.SOURCE_MONGODB)
+public class MongoDBSource extends StreamSource {
+
+ @ApiModelProperty("MongoDB primaryKey")
+ private String primaryKey;
+
+ @ApiModelProperty("MongoDB hosts")
+ private String hosts;
+
+ @ApiModelProperty("MongoDB username")
+ private String username;
+
+ @ApiModelProperty("MongoDB password")
+ private String password;
+
+ @ApiModelProperty("MongoDB database")
+ private String database;
+
+ @ApiModelProperty("MongoDB collection")
+ private String collection;
+
+ public MongoDBSource() {
+ this.setSourceType(SourceType.MONGODB.name());
+ }
+
+ @Override
+ public SourceRequest genSourceRequest() {
+ return CommonBeanUtils.copyProperties(this, MongoDBSourceRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
new file mode 100644
index 000000000..20033831e
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceDTO.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * MongoDB source info
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class MongoDBSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("MongoDB primaryKey")
+ private String primaryKey;
+
+ @ApiModelProperty("MongoDB hosts")
+ private String hosts;
+
+ @ApiModelProperty("MongoDB username")
+ private String username;
+
+ @ApiModelProperty("MongoDB password")
+ private String password;
+
+ @ApiModelProperty("MongoDB database")
+ private String database;
+
+ @ApiModelProperty("MongoDB collection")
+ private String collection;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static MongoDBSourceDTO getFromRequest(MongoDBSourceRequest request) {
+ return MongoDBSourceDTO.builder()
+ .primaryKey(request.getPrimaryKey())
+ .hosts(request.getHosts())
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .database(request.getDatabase())
+ .collection(request.getCollection())
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string
+ */
+ public static MongoDBSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, MongoDBSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceListResponse.java
new file mode 100644
index 000000000..c3451e761
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceListResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Response of MongoDB paging list
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of MongoDB paging list")
+@JsonTypeDefine(value = SourceType.SOURCE_MONGODB)
+public class MongoDBSourceListResponse extends SourceListResponse {
+
+ @ApiModelProperty("MongoDB primaryKey")
+ private String primaryKey;
+
+ @ApiModelProperty("MongoDB hosts")
+ private String hosts;
+
+ @ApiModelProperty("MongoDB username")
+ private String username;
+
+ @ApiModelProperty("MongoDB password")
+ private String password;
+
+ @ApiModelProperty("MongoDB database")
+ private String database;
+
+ @ApiModelProperty("MongoDB collection")
+ private String collection;
+
+ public MongoDBSourceListResponse() {
+ this.setSourceType(SourceType.MONGODB.getType());
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceRequest.java
new file mode 100644
index 000000000..83cd0bf77
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/mongodb/MongoDBSourceRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.mongodb;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the MongoDB source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of MongoDB source")
+@JsonTypeDefine(value = SourceType.SOURCE_MONGODB)
+public class MongoDBSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("MongoDB primaryKey")
+ private String primaryKey;
+
+ @ApiModelProperty("MongoDB hosts")
+ private String hosts;
+
+ @ApiModelProperty("MongoDB username")
+ private String username;
+
+ @ApiModelProperty("MongoDB password")
+ private String password;
+
+ @ApiModelProperty("MongoDB database")
+ private String database;
+
+ @ApiModelProperty("MongoDB collection")
+ private String collection;
+
+ public MongoDBSourceRequest() {
+ this.setSourceType(SourceType.MONGODB.toString());
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index c31b45991..3f3c44011 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
import org.apache.inlong.manager.common.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSource;
@@ -40,6 +41,7 @@ import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
@@ -89,6 +91,8 @@ public class ExtractNodeUtils {
return createExtractNode((OracleSource) sourceInfo);
case SQLSERVER:
return createExtractNode((SqlServerSource) sourceInfo);
+ case MONGODB:
+ return createExtractNode((MongoDBSource) sourceInfo);
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -229,7 +233,7 @@ public class ExtractNodeUtils {
.collect(Collectors.toList());
String topic = pulsarSource.getTopic();
- Format format = null;
+ Format format;
DataTypeEnum dataType = DataTypeEnum.forName(pulsarSource.getSerializationType());
switch (dataType) {
case CSV:
@@ -295,84 +299,98 @@ public class ExtractNodeUtils {
}
/**
- * Create Oracle extract node
+ * Create Oracle extract node
*
- * @param oracleSource Oracle source info
+ * @param source Oracle source info
* @return oracle extract node info
*/
- public static OracleExtractNode createExtractNode(OracleSource oracleSource) {
- final String id = oracleSource.getSourceName();
- final String name = oracleSource.getSourceName();
- final String database = oracleSource.getDatabase();
- final String schemaName = oracleSource.getSchemaName();
- final String tableName = oracleSource.getTableName();
- final String primaryKey = oracleSource.getPrimaryKey();
- final String hostName = oracleSource.getHostname();
- final String userName = oracleSource.getUsername();
- final String password = oracleSource.getPassword();
- final Integer port = oracleSource.getPort();
- ScanStartUpMode scanStartupMode = StringUtils.isBlank(oracleSource.getScanStartupMode())
- ? null : ScanStartUpMode.forName(oracleSource.getScanStartupMode());
- List<StreamField> streamFieldInfos = oracleSource.getFieldList();
+ public static OracleExtractNode createExtractNode(OracleSource source) {
+ String name = source.getSourceName();
+ List<StreamField> streamFieldInfos = source.getFieldList();
final List<FieldInfo> fieldInfos = streamFieldInfos.stream()
.map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
.collect(Collectors.toList());
+
Map<String, String> properties = Maps.newHashMap();
- return new OracleExtractNode(id,
+ ScanStartUpMode scanStartupMode = StringUtils.isBlank(source.getScanStartupMode())
+ ? null : ScanStartUpMode.forName(source.getScanStartupMode());
+
+ return new OracleExtractNode(
+ name,
name,
fieldInfos,
null,
properties,
- primaryKey,
- hostName,
- userName,
- password,
- database,
- schemaName,
- tableName,
- port,
- scanStartupMode);
+ source.getPrimaryKey(),
+ source.getHostname(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase(),
+ source.getSchemaName(),
+ source.getTableName(),
+ source.getPort(),
+ scanStartupMode
+ );
}
/**
* Create SqlServer extract node
*
- * @param sqlServerSource SqlServer source info
+ * @param source SqlServer source info
* @return SqlServer extract node info
*/
- public static SqlServerExtractNode createExtractNode(SqlServerSource sqlServerSource) {
- final String id = sqlServerSource.getSourceName();
- final String name = sqlServerSource.getSourceName();
- final String database = sqlServerSource.getDatabase();
- final String primaryKey = sqlServerSource.getPrimaryKey();
- final String hostName = sqlServerSource.getHostname();
- final String userName = sqlServerSource.getUsername();
- final String password = sqlServerSource.getPassword();
- final Integer port = sqlServerSource.getPort();
- final String schemaName = sqlServerSource.getSchemaName();
-
- String tablename = sqlServerSource.getTableName();
- List<StreamField> streamFields = sqlServerSource.getFieldList();
+ public static SqlServerExtractNode createExtractNode(SqlServerSource source) {
+ String name = source.getSourceName();
+ List<StreamField> streamFields = source.getFieldList();
List<FieldInfo> fieldInfos = streamFields.stream()
.map(fieldInfo -> FieldInfoUtils.parseStreamFieldInfo(fieldInfo, name))
.collect(Collectors.toList());
- final String serverTimeZone = sqlServerSource.getServerTimezone();
Map<String, String> properties = Maps.newHashMap();
- return new SqlServerExtractNode(id,
+ return new SqlServerExtractNode(
+ name,
name,
fieldInfos,
null,
properties,
- primaryKey,
- hostName,
- port,
- userName,
- password,
- database,
- schemaName,
- tablename,
- serverTimeZone);
+ source.getPrimaryKey(),
+ source.getHostname(),
+ source.getPort(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase(),
+ source.getSchemaName(),
+ source.getTableName(),
+ source.getServerTimezone()
+ );
+ }
+
+ /**
+ * Create MongoDB extract node
+ *
+ * @param source MongoDB source info
+ * @return MongoDB extract node info
+ */
+ public static MongoExtractNode createExtractNode(MongoDBSource source) {
+ String name = source.getSourceName();
+ List<StreamField> streamFields = source.getFieldList();
+ List<FieldInfo> fieldInfos = streamFields.stream()
+ .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+ .collect(Collectors.toList());
+ Map<String, String> properties = Maps.newHashMap();
+ return new MongoExtractNode(
+ name,
+ name,
+ fieldInfos,
+ null,
+ properties,
+ source.getPrimaryKey(),
+ source.getCollection(),
+ source.getHosts(),
+ source.getUsername(),
+ source.getPassword(),
+ source.getDatabase()
+ );
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongo/MongoDBSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongo/MongoDBSourceOperation.java
new file mode 100644
index 000000000..10f66be1c
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/mongo/MongoDBSourceOperation.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.mongo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceRequest;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.function.Supplier;
+
+/**
+ * MongoDB source operation.
+ */
+@Service
+public class MongoDBSourceOperation extends AbstractSourceOperation {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(SourceType sourceType) {
+ return SourceType.MONGODB == sourceType;
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.MONGODB.getType();
+ }
+
+ @Override
+ protected StreamSource getSource() {
+ return new MongoDBSource();
+ }
+
+ @Override
+ public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+ if (CollectionUtils.isEmpty(entityPage)) {
+ return new PageInfo<>();
+ }
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity, MongoDBSourceListResponse::new));
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+ MongoDBSourceRequest sourceRequest = (MongoDBSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ MongoDBSourceDTO dto = MongoDBSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSourceType();
+ Preconditions.checkTrue(getSourceType().equals(existType),
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+ MongoDBSourceDTO dto = MongoDBSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+ return result;
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/MongoDBSourceServiceTest.java
similarity index 75%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
copy to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/MongoDBSourceServiceTest.java
index c467001a8..30661f729 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/MongoDBSourceServiceTest.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.core.source;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import org.apache.inlong.manager.common.pojo.source.oracle.OracleSource;
-import org.apache.inlong.manager.common.pojo.source.oracle.OracleSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSource;
+import org.apache.inlong.manager.common.pojo.source.mongodb.MongoDBSourceRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
@@ -30,17 +30,14 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
- * Oracle source service test
+ * MongoDB source service test
*/
-public class OracleSourceServiceTest extends ServiceBaseTest {
+public class MongoDBSourceServiceTest extends ServiceBaseTest {
- private final String sourceName = "stream_source_service_test";
private static final String hostname = "127.0.0.1";
- private static final Integer port = 1521;
- private static final String database = "oracle_database";
- private static final String schema = "oracle_schema";
- private static final String tablename = "oracle_tablename";
-
+ private static final String database = "mongo_database";
+ private static final String collection = "mongo_collection";
+ private final String sourceName = "stream_source_service_test";
@Autowired
private StreamSourceService sourceService;
@Autowired
@@ -52,16 +49,14 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
public Integer saveSource() {
streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
- OracleSourceRequest sourceInfo = new OracleSourceRequest();
+ MongoDBSourceRequest sourceInfo = new MongoDBSourceRequest();
sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
sourceInfo.setSourceName(sourceName);
- sourceInfo.setSourceType(SourceType.ORACLE.getType());
- sourceInfo.setHostname(hostname);
+ sourceInfo.setSourceType(SourceType.MONGODB.getType());
+ sourceInfo.setHosts(hostname);
sourceInfo.setDatabase(database);
- sourceInfo.setTableName(tablename);
- sourceInfo.setPort(port);
- sourceInfo.setSchemaName(schema);
+ sourceInfo.setCollection(collection);
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
}
@@ -77,7 +72,6 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
@Test
public void testListByIdentifier() {
Integer id = this.saveSource();
-
StreamSource source = sourceService.get(id);
Assert.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
@@ -90,9 +84,8 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
StreamSource response = sourceService.get(id);
Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
- OracleSource oracleSource = (OracleSource) response;
-
- OracleSourceRequest request = CommonBeanUtils.copyProperties(oracleSource, OracleSourceRequest::new);
+ MongoDBSource mongoDBSource = (MongoDBSource) response;
+ MongoDBSourceRequest request = CommonBeanUtils.copyProperties(mongoDBSource, MongoDBSourceRequest::new);
boolean result = sourceService.update(request, GLOBAL_OPERATOR);
Assert.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
index c467001a8..44180badd 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/OracleSourceServiceTest.java
@@ -34,13 +34,12 @@ import org.springframework.beans.factory.annotation.Autowired;
*/
public class OracleSourceServiceTest extends ServiceBaseTest {
- private final String sourceName = "stream_source_service_test";
private static final String hostname = "127.0.0.1";
private static final Integer port = 1521;
private static final String database = "oracle_database";
private static final String schema = "oracle_schema";
- private static final String tablename = "oracle_tablename";
-
+ private static final String tableName = "oracle_table";
+ private final String sourceName = "stream_source_service_test";
@Autowired
private StreamSourceService sourceService;
@Autowired
@@ -59,7 +58,7 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
sourceInfo.setSourceType(SourceType.ORACLE.getType());
sourceInfo.setHostname(hostname);
sourceInfo.setDatabase(database);
- sourceInfo.setTableName(tablename);
+ sourceInfo.setTableName(tableName);
sourceInfo.setPort(port);
sourceInfo.setSchemaName(schema);
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
@@ -77,7 +76,6 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
@Test
public void testListByIdentifier() {
Integer id = this.saveSource();
-
StreamSource source = sourceService.get(id);
Assert.assertEquals(GLOBAL_GROUP_ID, source.getInlongGroupId());
@@ -91,7 +89,6 @@ public class OracleSourceServiceTest extends ServiceBaseTest {
Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
OracleSource oracleSource = (OracleSource) response;
-
OracleSourceRequest request = CommonBeanUtils.copyProperties(oracleSource, OracleSourceRequest::new);
boolean result = sourceService.update(request, GLOBAL_OPERATOR);
Assert.assertTrue(result);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
index 898c97361..91d8b718b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/source/SqlServerSourceServiceTest.java
@@ -34,13 +34,12 @@ import org.springframework.beans.factory.annotation.Autowired;
*/
public class SqlServerSourceServiceTest extends ServiceBaseTest {
- private final String sourceName = "stream_source_service_test";
private static final String hostname = "127.0.0.1";
private static final Integer port = 1521;
private static final String database = "sqlserver_database";
private static final String schema = "sqlserver_schema";
- private static final String tablename = "sqlserver_tablename";
-
+ private static final String tableName = "sqlserver_table";
+ private final String sourceName = "stream_source_service_test";
@Autowired
private StreamSourceService sourceService;
@Autowired
@@ -59,7 +58,7 @@ public class SqlServerSourceServiceTest extends ServiceBaseTest {
sourceInfo.setSourceType(SourceType.SQLSERVER.getType());
sourceInfo.setHostname(hostname);
sourceInfo.setDatabase(database);
- sourceInfo.setTableName(tablename);
+ sourceInfo.setTableName(tableName);
sourceInfo.setPort(port);
sourceInfo.setSchemaName(schema);
return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
@@ -90,7 +89,6 @@ public class SqlServerSourceServiceTest extends ServiceBaseTest {
Assert.assertEquals(GLOBAL_GROUP_ID, response.getInlongGroupId());
SqlServerSource sqlserverSource = (SqlServerSource) response;
-
SqlServerSourceRequest request = CommonBeanUtils.copyProperties(sqlserverSource, SqlServerSourceRequest::new);
boolean result = sourceService.update(request, GLOBAL_OPERATOR);
Assert.assertTrue(result);