You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/27 05:45:52 UTC

[rocketmq-connect] branch master updated: [ISSUE #312] mongo source connector adapt to new api 0.1.4 (#313)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 21f13c56 [ISSUE #312] mongo source connector adapt to new api 0.1.4 (#313)
21f13c56 is described below

commit 21f13c56c5d07d390d1a31c8a741c269316e905d
Author: Oliver <wq...@163.com>
AuthorDate: Tue Sep 27 13:45:47 2022 +0800

    [ISSUE #312] mongo source connector adapt to new api 0.1.4 (#313)
---
 connectors/rocketmq-connect-mongo/pom.xml          |  22 ++-
 .../org/apache/connect/mongo/SourceTaskConfig.java |  20 +++
 .../mongo/connector/MongoSourceConnector.java      |  33 ++--
 .../connect/mongo/connector/MongoSourceTask.java   |  49 +++---
 .../mongo/connector/builder/MongoDataEntry.java    | 173 ++++++++++-----------
 .../apache/connect/mongo/initsync/InitSync.java    |   3 -
 .../apache/connect/mongo/replicator/Constants.java |  19 +++
 .../mongo/replicator/MongoClientFactory.java       |  15 +-
 .../apache/connect/mongo/replicator/Position.java  |   2 +-
 .../connect/mongo/replicator/ReplicaSet.java       |  19 ++-
 .../connect/mongo/replicator/ReplicaSetConfig.java |  10 ++
 .../mongo/replicator/ReplicaSetsContext.java       |  30 ++--
 .../connect/mongo/replicator/ReplicatorTask.java   |   4 +-
 .../mongo/replicator/event/OperationType.java      |   4 +
 .../mongo/replicator/event/ReplicationEvent.java   |  14 --
 .../org/apache/connect/mongo/MongoFactoryTest.java |  23 +--
 .../connect/mongo/MongoSourceConnectorTest.java    |  58 +++----
 .../apache/connect/mongo/MongoSourceTaskTest.java  |  86 ++++------
 .../java/org/apache/connect/mongo/MongoTest.java   |  47 +++---
 .../apache/connect/mongo/ReplicaContextTest.java   |   2 +-
 .../org/apache/connect/mongo/ReplicaSetTest.java   |   2 +-
 .../connect/mongo/TestPositionStorageReader.java   |  35 +++++
 .../connect/runtime/config/WorkerConfig.java       |   2 +-
 23 files changed, 351 insertions(+), 321 deletions(-)

diff --git a/connectors/rocketmq-connect-mongo/pom.xml b/connectors/rocketmq-connect-mongo/pom.xml
index 2fc00d13..9f9c1be5 100644
--- a/connectors/rocketmq-connect-mongo/pom.xml
+++ b/connectors/rocketmq-connect-mongo/pom.xml
@@ -140,6 +140,24 @@
                     </excludes>
                 </configuration>
             </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 
@@ -148,7 +166,7 @@
         <dependency>
             <groupId>org.mongodb</groupId>
             <artifactId>mongodb-driver</artifactId>
-            <version>3.10.1</version>
+            <version>3.12.11</version>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -163,7 +181,7 @@
         <dependency>
             <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
-            <version>0.1.1</version>
+            <version>0.1.4</version>
         </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index d184b5ce..10f9a57c 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -47,6 +47,26 @@ public class SourceTaskConfig {
     private String trustStorePassword;
     private int copyThread = Runtime.getRuntime().availableProcessors();
 
+    private long maxConnectionIdleTime;
+
+    private boolean socketKeepAlive;
+
+    public boolean getSocketKeepAlive() {
+        return socketKeepAlive;
+    }
+
+    public void setSocketKeepAlive(boolean socketKeepAlive) {
+        this.socketKeepAlive = socketKeepAlive;
+    }
+
+    public long getMaxConnectionIdleTime() {
+        return maxConnectionIdleTime;
+    }
+
+    public void setMaxConnectionIdleTime(long maxConnectionIdleTime) {
+        this.maxConnectionIdleTime = maxConnectionIdleTime;
+    }
+
     public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() {
         {
             add("mongoAddr");
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index 5be2e0d1..9939de4e 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -18,8 +18,8 @@
 package org.apache.connect.mongo.connector;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.connect.mongo.SourceTaskConfig;
@@ -32,34 +32,26 @@ public class MongoSourceConnector extends SourceConnector {
     private KeyValue keyValueConfig;
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void start(KeyValue config) {
         for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                throw new RuntimeException("Request config key: " + requestKey);
             }
         }
         this.keyValueConfig = config;
-        return "";
-    }
-
-    @Override
-    public void start() {
-        logger.info("start mongo source connector:{}", keyValueConfig);
     }
 
     @Override
     public void stop() {
-
+        this.keyValueConfig = null;
     }
 
-    @Override
-    public void pause() {
 
-    }
-
-    @Override
-    public void resume() {
 
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.keyValueConfig);
+        return config;
     }
 
     @Override
@@ -67,10 +59,5 @@ public class MongoSourceConnector extends SourceConnector {
         return MongoSourceTask.class;
     }
 
-    @Override
-    public List<KeyValue> taskConfigs() {
-        List<KeyValue> config = new ArrayList<>();
-        config.add(this.keyValueConfig);
-        return config;
-    }
+
 }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 49bcf49a..c913a3e5 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -17,14 +17,16 @@
 
 package org.apache.connect.mongo.connector;
 
-import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSet;
 import org.apache.connect.mongo.replicator.ReplicaSetManager;
@@ -43,7 +45,7 @@ public class MongoSourceTask extends SourceTask {
     private ReplicaSetsContext replicaSetsContext;
 
     @Override
-    public Collection<SourceDataEntry> poll() {
+    public List<ConnectRecord> poll() {
 
         return replicaSetsContext.poll();
     }
@@ -59,20 +61,18 @@ public class MongoSourceTask extends SourceTask {
             replicaSetManager = ReplicaSetManager.create(sourceTaskConfig.getMongoAddr());
 
             replicaSetManager.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
-                ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
-                    replicaSetName.getBytes()));
-                if (byteBuffer != null && byteBuffer.array().length > 0) {
-                    String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
-                    Position position = JSONObject.parseObject(positionJson, Position.class);
+                final RecordOffset recordOffset = this.sourceTaskContext.offsetStorageReader().readOffset(this.buildRecordPartition(replicaSetName));
+                if (recordOffset != null && recordOffset.getOffset().size() > 0) {
+                    final Map<String, Object> offset = (Map<String, Object>) recordOffset.getOffset();
+                    Position position = new Position();
+                    position.setTimeStamp((int) offset.get(Constants.TIMESTAMP));
                     replicaSetConfig.setPosition(position);
                 } else {
                     Position position = new Position();
                     position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp());
-                    position.setInc(sourceTaskConfig.getPositionInc());
-                    position.setInitSync(sourceTaskConfig.isDataSync());
                     replicaSetConfig.setPosition(position);
                 }
-
+                replicaSetConfig.setMaxTask(config.getInt(Constants.MAX_TASK));
                 ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
                 replicaSetsContext.addReplicaSet(replicaSet);
                 replicaSet.start();
@@ -84,22 +84,17 @@ public class MongoSourceTask extends SourceTask {
         }
     }
 
+    private RecordPartition buildRecordPartition(String replicaSetName) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put(Constants.REPLICA_SET_NAME, replicaSetName);
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
     @Override
     public void stop() {
         logger.info("shut down.....");
         replicaSetsContext.shutdown();
     }
 
-    @Override
-    public void pause() {
-        logger.info("pause replica task...");
-        replicaSetsContext.pause();
-    }
-
-    @Override
-    public void resume() {
-        logger.info("resume replica task...");
-        replicaSetsContext.resume();
-    }
-
 }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
index 1d6dfe53..730bec08 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -17,118 +17,115 @@
 
 package org.apache.connect.mongo.connector.builder;
 
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
-import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.BsonTimestamp;
-
-import static org.apache.connect.mongo.replicator.Constants.CREATED;
-import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
-import static org.apache.connect.mongo.replicator.Constants.PATCH;
-import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
-import static org.apache.connect.mongo.replicator.Constants.VERSION;
+import org.bson.Document;
 
 public class MongoDataEntry {
 
-    private static String SCHEMA_CREATED_NAME = "mongo_created";
-    private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
-
-    public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
-
-        DataEntryBuilder dataEntryBuilder;
-
-        if (event.getOperationType().equals(OperationType.CREATED)) {
-            Schema schema = createdSchema(replicaSetConfig.getReplicaSetName());
-            dataEntryBuilder = new DataEntryBuilder(schema);
-            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
-                .entryType(event.getEntryType());
-
-            dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
-            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-
-        } else {
-            Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
-            dataEntryBuilder = new DataEntryBuilder(schema);
-            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
-                .entryType(event.getEntryType());
-            dataEntryBuilder.putFiled(OPERATION_TYPE, event.getOperationType().name());
-            dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
-            dataEntryBuilder.putFiled(VERSION, event.getV());
-            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-            dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
-            dataEntryBuilder.putFiled(OBJECT_ID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+    public static ConnectRecord createSourceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+        final Position position = replicaSetConfig.getPosition();
+        final int oldTimestamp = position.getTimeStamp();
+        final BsonTimestamp timestamp = event.getTimestamp();
+        if (oldTimestamp != 0 && timestamp != null &&  timestamp.getTime() <= oldTimestamp) {
+            return null;
         }
-
-        String position = createPosition(event, replicaSetConfig);
-        SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-            ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
-            ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
-        return sourceDataEntry;
+        Schema schema = SchemaBuilder.struct().name(Constants.MONGO).build();
+        final List<Field> fields = buildFields(event.getDocument());
+        schema.setFields(fields);
+        final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(replicaSetConfig),
+            buildRecordOffset(event, replicaSetConfig),
+            System.currentTimeMillis(),
+            schema,
+            buildPayLoad(fields, event, schema));
+        connectRecord.setExtensions(buildExtendFiled(event));
+        return connectRecord;
     }
 
-    private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
-        Position position = new Position();
-        BsonTimestamp timestamp = event.getTimestamp();
-        position.setInc(timestamp != null ? timestamp.getInc() : 0);
-        position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
-        position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ? true : false);
-        return JSONObject.toJSONString(position);
+    private static RecordPartition buildRecordPartition(ReplicaSetConfig replicaSetConfig) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put(Constants.REPLICA_SET_NAME, replicaSetConfig.getReplicaSetName());
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
 
+    private static RecordOffset buildRecordOffset(ReplicationEvent event, ReplicaSetConfig config)  {
+        Map<String, Integer> offsetMap = new HashMap<>();
+        final Position position = config.getPosition();
+        offsetMap.put(Constants.TIMESTAMP, event.getTimestamp() != null ? event.getTimestamp().getTime() : position.getTimeStamp());
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
     }
 
-    private static Schema createdSchema(String dataSourceName) {
-        Schema schema = new Schema();
-        schema.setDataSource(dataSourceName);
-        schema.setName(SCHEMA_CREATED_NAME);
-        schema.setFields(new ArrayList<>());
-        createdField(schema);
-        return schema;
+    private static List<Field> buildFields(Document document) {
+        List<Field> fields = new ArrayList<>();
+        int i = 0;
+        for (Map.Entry<String, Object> entry : document.entrySet()) {
+            final String key = entry.getKey();
+            final Object value = entry.getValue();
+            fields.add(new Field(i++, key, getSchema(value)));
+        }
+        return fields;
     }
 
-    private static Schema oplogSchema(String dataSourceName) {
-        Schema schema = new Schema();
-        schema.setDataSource(dataSourceName);
-        schema.setName(SCHEMA_OPLOG_NAME);
-        oplogField(schema);
-        return schema;
+    private static Struct buildPayLoad(List<Field> fields, ReplicationEvent event, Schema schema) {
+        Struct payLoad = new Struct(schema);
+        final Document document = event.getDocument();
+        for (Field field : fields) {
+            final Schema valueSchema = field.getSchema();
+            if (valueSchema.getFieldType().equals(FieldType.STRING)) {
+                payLoad.put(field, JSON.toJSONString(document.get(field.getName())));
+            } else {
+                payLoad.put(field, document.get(field.getName()));
+            }
+        }
+        return payLoad;
     }
 
-    private static void createdField(Schema schema) {
-        Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
-        schema.getFields().add(namespace);
-        Field operation = new Field(1, Constants.CREATED, FieldType.STRING);
-        schema.getFields().add(operation);
+    private static KeyValue buildExtendFiled(ReplicationEvent event) {
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(Constants.OPERATION_TYPE,  event.getOperationType().getOperation());
+        keyValue.put(Constants.COLLECTION_NAME, event.getCollectionName());
+        keyValue.put(Constants.NAMESPACE, event.getNamespace());
+        keyValue.put(Constants.REPLICA_SET_NAME, event.getReplicaSetName());
+        return keyValue;
     }
 
-    private static void oplogField(Schema schema) {
-        schema.setFields(new ArrayList<>());
-        Field op = new Field(0, OPERATION_TYPE, FieldType.STRING);
-        schema.getFields().add(op);
-        Field time = new Field(1, TIMESTAMP, FieldType.INT64);
-        schema.getFields().add(time);
-        Field v = new Field(2, VERSION, FieldType.INT32);
-        schema.getFields().add(v);
-        Field namespace = new Field(3, NAMESPACE, FieldType.STRING);
-        schema.getFields().add(namespace);
-        Field patch = new Field(4, PATCH, FieldType.STRING);
-        schema.getFields().add(patch);
-        Field objectId = new Field(5, OBJECT_ID, FieldType.STRING);
-        schema.getFields().add(objectId);
+    private static Schema getSchema(Object value) {
+        Schema schema = null;
+        if (value instanceof Date) {
+            schema = SchemaBuilder.time().build();
+        } else if (value instanceof Document) {
+            schema = SchemaBuilder.string().build();
+        } else if (value instanceof Long) {
+            schema = SchemaBuilder.int64().build();
+        } else if (value instanceof Integer) {
+            schema = SchemaBuilder.int32().build();
+        } else if (value instanceof Boolean) {
+            schema = SchemaBuilder.bool().build();
+        } else {
+            schema = SchemaBuilder.string().build();
+        }
+        return schema;
     }
 
 }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index a51b727f..ae704c34 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -67,8 +67,6 @@ public class InitSync {
             countDownLatch.await();
         } catch (InterruptedException e) {
             logger.error("init sync wait countDownLatch interrupted");
-        } finally {
-            copyExecutor.shutdown();
         }
     }
 
@@ -76,7 +74,6 @@ public class InitSync {
         interestCollections = getInterestCollection();
         copyThreadCount = Math.min(interestCollections.size(), context.getCopyThread());
         copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
-
             AtomicInteger threads = new AtomicInteger();
 
             @Override
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index 7ba1ac49..b45eeec5 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -31,6 +31,25 @@ public class Constants {
     public static final String OBJECT_ID = "o2";
 
     public static final String CREATED = "created";
+
     public static final String PATCH = "patch";
 
+    public static final String DOCUMENT = "document";
+
+    public static final String COLLECTION_NAME = "collectionName";
+
+    public static final String REPLICA_SET_NAME = "replicaSetName";
+
+    public static final String INCREMENT = "increment";
+
+    public static final String INIT_SYNC = "initSync";
+
+    public static String SCHEMA_CREATED_NAME = "mongo_created";
+
+    public static String SCHEMA_OPLOG_NAME = "mongo_oplog";
+
+    public static String MONGO = "MONGO";
+
+    public static final String MAX_TASK = "max.tasks";
+
 }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
index 11bca8ff..41a3364f 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
@@ -109,6 +109,18 @@ public class MongoClientFactory {
             sb.append(taskConfig.getZlibCompressionLevel());
         }
 
+        if (taskConfig.getMaxConnectionIdleTime() > 0) {
+            sb.append("&");
+            sb.append("maxConnectionIdleTime=");
+            sb.append(taskConfig.getMaxConnectionIdleTime());
+        }
+
+        if (taskConfig.getSocketKeepAlive()) {
+            sb.append("&");
+            sb.append("socketKeepAlive=");
+            sb.append(true);
+        }
+
         if (StringUtils.isNotBlank(taskConfig.getTrustStore())) {
             Properties properties = System.getProperties();
             properties.put("javax.net.ssl.trustStore", taskConfig.getTrustStore());
@@ -123,7 +135,8 @@ public class MongoClientFactory {
 
         logger.info("connection string :{}", sb.toString());
         ConnectionString connectionString = new ConnectionString(sb.toString());
-        return MongoClients.create(connectionString);
+        final MongoClient client = MongoClients.create(connectionString);
+        return client;
     }
 
 }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java
index 29fd8565..830a1e37 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java
@@ -64,7 +64,7 @@ public class Position {
         return timeStamp > 0 && inc > 0;
     }
 
-    public BsonTimestamp converBsonTimeStamp() {
+    public BsonTimestamp convertBsonTimeStamp() {
         return new BsonTimestamp(timeStamp, inc);
     }
 
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
index 83933165..123c2680 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -63,7 +63,14 @@ public class ReplicaSet {
         try {
             this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig);
             this.checkReplicaMongo();
-            executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+            final Integer maxTask = replicaSetConfig.getMaxTask();
+            if (maxTask != null && maxTask > 1) {
+                for (int i = 0; i < maxTask; i++) {
+                    executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+                }
+            } else {
+                executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+            }
         } catch (Exception e) {
             logger.error("start replicator:{} error", replicaSetConfig, e);
             shutdown();
@@ -83,15 +90,7 @@ public class ReplicaSet {
     }
 
     public void shutdown() {
-        if (running.compareAndSet(true, false)) {
-            if (!this.executorService.isShutdown()) {
-                executorService.shutdown();
-            }
-            if (this.mongoClient != null) {
-                this.mongoClient.close();
-            }
-        }
-
+        running.compareAndSet(true, false);
     }
 
     public void pause() {
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
index ced90b8b..2892f273 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -24,6 +24,16 @@ public class ReplicaSetConfig {
     private String host;
     private Position position;
 
+    private Integer maxTask;
+
+    public Integer getMaxTask() {
+        return maxTask;
+    }
+
+    public void setMaxTask(Integer maxTask) {
+        this.maxTask = maxTask;
+    }
+
     public Position getPosition() {
         return position;
     }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index 8dd85d75..51febda3 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -18,12 +18,9 @@
 package org.apache.connect.mongo.replicator;
 
 import com.mongodb.client.MongoClient;
-import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,10 +28,14 @@ import org.apache.connect.mongo.SourceTaskConfig;
 import org.apache.connect.mongo.connector.builder.MongoDataEntry;
 import org.apache.connect.mongo.initsync.CollectionMeta;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ReplicaSetsContext {
 
-    private BlockingQueue<SourceDataEntry> dataEntryQueue;
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private BlockingQueue<ConnectRecord> connectRecordQueue;
 
     private SourceTaskConfig taskConfig;
 
@@ -50,7 +51,7 @@ public class ReplicaSetsContext {
     public ReplicaSetsContext(SourceTaskConfig taskConfig) {
         this.taskConfig = taskConfig;
         this.replicaSets = new ArrayList<>();
-        this.dataEntryQueue = new LinkedBlockingDeque<>();
+        this.connectRecordQueue = new LinkedBlockingDeque<>();
         this.operationFilter = new OperationFilter(taskConfig);
         this.mongoClientFactory = new MongoClientFactory(taskConfig);
     }
@@ -88,20 +89,25 @@ public class ReplicaSetsContext {
     }
 
     public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
-        SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig);
+
+        ConnectRecord connectRecord = MongoDataEntry.createSourceDataEntry(event, replicaSetConfig);
+        if (connectRecord == null) {
+            return;
+        }
         while (true) {
             try {
-                dataEntryQueue.put(sourceDataEntry);
+                connectRecordQueue.put(connectRecord);
                 break;
-            } catch (InterruptedException e) {
+            } catch (Exception e) {
+                logger.error("convert error", e);
             }
         }
 
     }
 
-    public Collection<SourceDataEntry> poll() {
-        List<SourceDataEntry> res = new ArrayList<>();
-        if (dataEntryQueue.drainTo(res, 20) == 0) {
+    public List<ConnectRecord> poll() {
+        List<ConnectRecord> res = new ArrayList<>();
+        if (connectRecordQueue.drainTo(res, 20) == 0) {
             try {
                 Thread.sleep(10);
             } catch (InterruptedException e) {
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index cd78f241..048dd6d9 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -61,7 +61,7 @@ public class ReplicatorTask implements Runnable {
         boolean needDataSync = !userConfigOrRuntimePosition.isValid()
             || userConfigOrRuntimePosition.isInitSync()
             // userConfigOrRuntimePosition.position < firstAvailablePosition maybe lost some operations
-            || userConfigOrRuntimePosition.converBsonTimeStamp().compareTo(firstAvailablePosition) < 0;
+            || userConfigOrRuntimePosition.convertBsonTimeStamp().compareTo(firstAvailablePosition) < 0;
 
         if (needDataSync) {
             recordLastOplogPosition();
@@ -76,7 +76,7 @@ public class ReplicatorTask implements Runnable {
 
         MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
         FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
-            Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
+            Filters.gt("ts", replicaSetConfig.getPosition().convertBsonTimeStamp()));
 
         MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
             .noCursorTimeout(true)
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
index b4186667..62152b94 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
@@ -31,6 +31,10 @@ public enum OperationType {
 
     private final String operation;
 
+    public String getOperation() {
+        return operation;
+    }
+
     OperationType(String operation) {
         this.operation = operation;
     }
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 7adca716..ac9a666a 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -17,7 +17,6 @@
 
 package org.apache.connect.mongo.replicator.event;
 
-import io.openmessaging.connector.api.data.EntryType;
 import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.bson.BsonTimestamp;
@@ -92,19 +91,6 @@ public class ReplicationEvent {
         return objectId;
     }
 
-    public EntryType getEntryType() {
-        switch (operationType) {
-            case UPDATE:
-                return EntryType.UPDATE;
-            case DELETE:
-                return EntryType.DELETE;
-            case INSERT:
-                return EntryType.CREATE;
-            default:
-                return EntryType.CREATE;
-        }
-    }
-
     public void setOperationType(OperationType operationType) {
         this.operationType = operationType;
     }
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
index 1e077817..73b9c955 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -46,7 +46,7 @@ public class MongoFactoryTest {
 
     @Before
     public void before() {
-        this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27027");
+        this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017");
         this.sourceTaskConfig = new SourceTaskConfig();
         this.mongoClientFactory = new MongoClientFactory(sourceTaskConfig);
     }
@@ -170,25 +170,4 @@ public class MongoFactoryTest {
         return null;
     }
 
-    @Test
-    public void testSSLTrustStore() {
-        sourceTaskConfig.setMongoUserName("user_test");
-        sourceTaskConfig.setMongoPassWord("pwd_test");
-        sourceTaskConfig.setSsl(true);
-        sourceTaskConfig.setSslInvalidHostNameAllowed(true);
-        sourceTaskConfig.setTrustStore("/Users/home/test.pem");
-        sourceTaskConfig.setTrustStorePassword("test001");
-        sourceTaskConfig.setServerSelectionTimeoutMS(10000);
-        MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
-        MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
-        Document document = new Document();
-        document.put("name", "test");
-        collection.insertOne(document);
-        MongoCursor<Document> iterator = collection.find().iterator();
-        while (iterator.hasNext()) {
-            System.out.println(iterator.next());
-        }
-
-    }
-
 }
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index 7206408d..75491645 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -17,18 +17,18 @@
 
 package org.apache.connect.mongo;
 
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.EntryType;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPosition;
+import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.connect.mongo.connector.MongoSourceConnector;
 import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.ReplicaSetsContext;
@@ -59,17 +59,12 @@ public class MongoSourceConnectorTest {
         Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class);
     }
 
-    @Test
-    public void verifyConfig() {
-        String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
-        Assert.assertTrue(s.contains("Request config key:"));
-    }
 
     @Test
     public void testPoll() throws Exception {
-        LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>();
+        LinkedBlockingQueue<ConnectRecord> entries = new LinkedBlockingQueue<>();
         ReplicaSetsContext context = new ReplicaSetsContext(sourceTaskConfig);
-        Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("dataEntryQueue");
+        Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("connectRecordQueue");
         dataEntryQueue.setAccessible(true);
         dataEntryQueue.set(context, entries);
         ReplicationEvent event = new ReplicationEvent();
@@ -80,32 +75,29 @@ public class MongoSourceConnectorTest {
         event.setH(324243242L);
         event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue")));
         event.setObjectId(Optional.empty());
-        context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27027"));
-        List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll();
-        Assert.assertTrue(sourceDataEntries.size() == 1);
-
-        SourceDataEntry sourceDataEntry = sourceDataEntries.get(0);
-        Assert.assertEquals("test-person", sourceDataEntry.getQueueName());
+        event.setReplicaSetName("testReplicaName");
+        event.setCollectionName("testCollectName");
+        final ReplicaSetConfig name = new ReplicaSetConfig("", "testReplicaName", "localhost:27027");
+        Position position = new Position();
+        position.setTimeStamp(0);
+        name.setPosition(position);
+        context.publishEvent(event, name);
+        List<ConnectRecord> connectRecords = context.poll();
+        Assert.assertTrue(connectRecords.size() == 1);
 
-        ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
-        Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
+        ConnectRecord connectRecord = connectRecords.get(0);
+        final Struct data = (Struct) connectRecord.getData();
 
-        ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
-        Position position = JSONObject.parseObject(new String(sourcePosition.array()), Position.class);
-        Assert.assertEquals(position.getTimeStamp(), 1565609506);
-        Assert.assertEquals(position.getInc(), 1);
-        Assert.assertEquals(position.isInitSync(), false);
+        Assert.assertEquals("test.person", connectRecord.getExtension(Constants.NAMESPACE));
+        Assert.assertEquals("testReplicaName", connectRecord.getExtension(Constants.REPLICA_SET_NAME));
 
-        EntryType entryType = sourceDataEntry.getEntryType();
-        Assert.assertEquals(EntryType.CREATE, entryType);
+        final RecordPosition recordPosition = connectRecord.getPosition();
+        final RecordOffset offsetMap = recordPosition.getOffset();
+        Assert.assertEquals(1565609506, offsetMap.getOffset().get(Constants.TIMESTAMP));
 
-        String queueName = sourceDataEntry.getQueueName();
-        Assert.assertEquals("test-person", queueName);
+        final List<io.openmessaging.connector.api.data.Field> fields = connectRecord.getSchema().getFields();
+        Assert.assertEquals(1, fields.size());
 
-        Schema schema = sourceDataEntry.getSchema();
-        Assert.assertTrue(schema.getFields().size() == 6);
-        Object[] payload = sourceDataEntry.getPayload();
-        Assert.assertTrue(payload.length == 6);
 
     }
 
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
index 1e0c82ab..48b5f759 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
@@ -17,18 +17,13 @@
 
 package org.apache.connect.mongo;
 
-import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.PositionStorageReader;
-import io.openmessaging.connector.api.source.SourceTask;
-import io.openmessaging.connector.api.source.SourceTaskContext;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.connect.mongo.connector.MongoSourceTask;
 import org.apache.connect.mongo.replicator.ReplicaSet;
@@ -49,7 +44,7 @@ public class MongoSourceTaskTest {
         defaultKeyValue.put("serverSelectionTimeoutMS", "10");
         defaultKeyValue.put("dataSync", "true");
 
-        Field context = SourceTask.class.getDeclaredField("context");
+        Field context = SourceTask.class.getDeclaredField("sourceTaskContext");
         context.setAccessible(true);
         context.set(mongoSourceTask, emptyTaskContext());
         mongoSourceTask.start(defaultKeyValue);
@@ -69,30 +64,26 @@ public class MongoSourceTaskTest {
         Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
         Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
         Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 11111111);
-        Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 111);
-        Assert.assertTrue(replicaSetConfig1.getPosition().isInitSync());
     }
 
     private SourceTaskContext emptyTaskContext() {
         return new SourceTaskContext() {
-            @Override
-            public PositionStorageReader positionStorageReader() {
-                return new PositionStorageReader() {
-                    @Override
-                    public ByteBuffer getPosition(ByteBuffer partition) {
-                        return null;
-                    }
-
-                    @Override
-                    public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
-                        return null;
-                    }
-                };
+
+            @Override public OffsetStorageReader offsetStorageReader() {
+                return new TestPositionStorageReader();
+            }
+
+            @Override public String getConnectorName() {
+                return "mongoSourceConnector";
+            }
+
+            @Override public String getTaskName() {
+                return "mongoSourceTask";
             }
 
             @Override
             public KeyValue configs() {
-                return null;
+                return new DefaultKeyValue();
             }
         };
     }
@@ -101,12 +92,12 @@ public class MongoSourceTaskTest {
     public void testContextStart() throws NoSuchFieldException, IllegalAccessException {
         MongoSourceTask mongoSourceTask = new MongoSourceTask();
         DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
-        defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027");
+        defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27017");
         defaultKeyValue.put("serverSelectionTimeoutMS", "10");
 
-        Field context = SourceTask.class.getDeclaredField("context");
+        Field context = SourceTask.class.getDeclaredField("sourceTaskContext");
         context.setAccessible(true);
-        context.set(mongoSourceTask, TaskContext());
+        context.set(mongoSourceTask, taskContext());
         mongoSourceTask.start(defaultKeyValue);
 
         Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext");
@@ -122,37 +113,28 @@ public class MongoSourceTaskTest {
         replicaSetConfig.setAccessible(true);
         ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet);
         Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
-        Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
-        Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 22222222);
-        Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 222);
-        Assert.assertTrue(!replicaSetConfig1.getPosition().isInitSync());
+        Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27017"));
+        Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 0);
     }
 
-    private SourceTaskContext TaskContext() {
+    private SourceTaskContext taskContext() {
         return new SourceTaskContext() {
-            @Override
-            public PositionStorageReader positionStorageReader() {
-                return new PositionStorageReader() {
-                    @Override
-                    public ByteBuffer getPosition(ByteBuffer partition) {
-
-                        Map<String, Object> po = new HashMap<>();
-                        po.put("timeStamp", 22222222);
-                        po.put("inc", 222);
-                        po.put("initSync", false);
-                        return ByteBuffer.wrap(JSONObject.toJSONString(po).getBytes());
-                    }
-
-                    @Override
-                    public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
-                        return null;
-                    }
-                };
+
+            @Override public OffsetStorageReader offsetStorageReader() {
+                return new TestPositionStorageReader();
+            }
+
+            @Override public String getConnectorName() {
+                return "mongoSourceConnector";
+            }
+
+            @Override public String getTaskName() {
+                return "mongoSourceTask";
             }
 
             @Override
             public KeyValue configs() {
-                return null;
+                return new DefaultKeyValue();
             }
         };
     }
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java
index 7a67c752..3f8a8175 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -23,11 +23,12 @@ import com.mongodb.MongoClientSettings;
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCollection;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.data.Struct;
 import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -56,7 +57,7 @@ public class MongoTest {
     @Before
     public void before() {
         MongoClientSettings.Builder builder = MongoClientSettings.builder();
-        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27027"));
+        builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27017"));
         mongoClient = MongoClients.create(builder.build());
     }
 
@@ -76,17 +77,16 @@ public class MongoTest {
         Assert.assertEquals("test.person", event.getNamespace());
         Assert.assertTrue(11111L == event.getH());
         Assert.assertEquals(OperationType.INSERT, event.getOperationType());
-        Assert.assertEquals(EntryType.CREATE, event.getEntryType());
         Assert.assertEquals(document, event.getEventData().get());
         Assert.assertEquals("testR", event.getReplicaSetName());
 
     }
 
     @Test
-    public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+    public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException {
         MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
         collection.deleteMany(new Document());
-        int count = 1000;
+        int count = 10;
         List<String> documents = new ArrayList<>(count);
         for (int i = 0; i < count; i++) {
             Document document = new Document();
@@ -103,6 +103,8 @@ public class MongoTest {
         insterest.put("test", collections);
         sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
         ReplicaSetConfig replicaSetConfig = new ReplicaSetConfig("", "test", "localhost");
+        Position position = new Position();
+        replicaSetConfig.setPosition(position);
         ReplicaSetsContext replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
         ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
         Field running = ReplicaSet.class.getDeclaredField("running");
@@ -112,27 +114,16 @@ public class MongoTest {
         initSync.start();
         int syncCount = 0;
         while (syncCount < count) {
-            Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll();
-            Assert.assertTrue(sourceDataEntries.size() > 0);
-            for (SourceDataEntry sourceDataEntry : sourceDataEntries) {
-                ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
-                Assert.assertEquals("test", new String(sourcePartition.array()));
-                ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
-                Position position = new Position();
-                position.setInitSync(true);
-                position.setTimeStamp(0);
-                position.setInc(0);
-                Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), Position.class));
-                EntryType entryType = sourceDataEntry.getEntryType();
-                Assert.assertEquals(EntryType.CREATE, entryType);
-                String queueName = sourceDataEntry.getQueueName();
-                Assert.assertEquals("test-person", queueName);
-                Schema schema = sourceDataEntry.getSchema();
-                Assert.assertTrue(schema.getFields().size() == 2);
-                Object[] payload = sourceDataEntry.getPayload();
-                Assert.assertTrue(payload.length == 2);
-                Assert.assertEquals(payload[0].toString(), "test.person");
-                Assert.assertTrue(documents.contains(JSONObject.parseObject(payload[1].toString(), Document.class).get("_id", JSONObject.class).getString("$oid")));
+            Collection<ConnectRecord> connectRecords = replicaSetsContext.poll();
+            Assert.assertTrue(connectRecords.size() > 0);
+            for (ConnectRecord connectRecord : connectRecords) {
+                final Struct data = (Struct) connectRecord.getData();
+                final Object[] values = data.getValues();
+                Schema schema = connectRecord.getSchema();
+                Assert.assertTrue(schema.getFields().size() == 4);
+                Assert.assertTrue(values.length == 4);
+                final Map<String, ?> partition = connectRecord.getPosition().getPartition().getPartition();
+                Assert.assertEquals("test", partition.get(Constants.REPLICA_SET_NAME));
                 syncCount++;
             }
 
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
index f19768eb..a723106e 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
@@ -39,7 +39,7 @@ public class ReplicaContextTest {
 
     @Test
     public void testCreateMongoClient() {
-        MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027"));
+        MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27017"));
         MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames();
         MongoCursor<String> iterator = collectionNames.iterator();
         while (iterator.hasNext()) {
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
index c19098bf..d3edef67 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
@@ -39,7 +39,7 @@ public class ReplicaSetTest {
     @Before
     public void before() {
         this.sourceTaskConfig = new SourceTaskConfig();
-        this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027");
+        this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27017");
         this.replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
         this.replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
     }
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/TestPositionStorageReader.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/TestPositionStorageReader.java
new file mode 100644
index 00000000..509c1a46
--- /dev/null
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/TestPositionStorageReader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.connect.mongo;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import java.util.Collection;
+import java.util.Map;
+
+public class TestPositionStorageReader implements OffsetStorageReader {
+
+    @Override public RecordOffset readOffset(RecordPartition partition) {
+        return null;
+    }
+
+    @Override public Map<RecordPartition, RecordOffset> readOffsets(Collection<RecordPartition> partitions) {
+        return null;
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
index 69bcacf6..366eb3f2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
@@ -48,7 +48,7 @@ public class WorkerConfig {
      * config example:
      * namesrvAddr = localhost:9876
      */
-    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
 
     /**
      * Http port for REST API.