You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:24 UTC

[rocketmq-connect] 01/13: add junit test and modify some code

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit f629088918a2cc725326b981749798c0ba4b64de
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Mon Aug 5 15:01:55 2019 +0800

    add junit test and modify some code
---
 pom.xml                                            | 180 +++++++++++++++++++++
 .../connect/mongo/MongoReplicatorConfig.java       | 157 ++++++++++++++++++
 .../mongo/connector/MongoSourceConnector.java      |  61 +++++++
 .../connect/mongo/connector/MongoSourceTask.java   | 148 +++++++++++++++++
 .../connect/mongo/initsync/CollectionMeta.java     |  41 +++++
 .../apache/connect/mongo/initsync/InitSync.java    | 146 +++++++++++++++++
 .../apache/connect/mongo/replicator/Filter.java    |  74 +++++++++
 .../connect/mongo/replicator/MongoReplicator.java  | 152 +++++++++++++++++
 .../connect/mongo/replicator/ReplicatorTask.java   |  84 ++++++++++
 .../java/org/apache/connect/mongo/FilterTest.java  |  67 ++++++++
 .../connect/mongo/MongoSourceConnectorTest.java    |  45 ++++++
 .../org/apache/connect/mongo/ReplicatorTest.java   |  33 ++++
 12 files changed, 1188 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..7ea7ab1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,180 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-mongo</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-mongo</name>
+    <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-mongo</url>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+        </plugins>
+    </build>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongodb-driver</artifactId>
+            <version>3.10.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.26</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>1.0.0-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
new file mode 100644
index 0000000..18a834f
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -0,0 +1,157 @@
+package org.apache.connect.mongo;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongoReplicatorConfig {
+
+    private String mongoAddr;
+    private int mongoPort;
+    private String mongoUserName;
+    private String mongoPassWord;
+    private String interestDB;
+    private String interestCollection;
+    private long positionTimeStamp;
+    private int positionInc;
+    private String dataSync;
+    private int copyThread = Runtime.getRuntime().availableProcessors();
+
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+        {
+            add("mongoAddr");
+            add("mongoPort");
+        }
+    };
+
+    public int getPositionInc() {
+        return positionInc;
+    }
+
+    public void setPositionInc(int positionInc) {
+        this.positionInc = positionInc;
+    }
+
+    public int getCopyThread() {
+        return copyThread;
+    }
+
+    public void setCopyThread(int copyThread) {
+        this.copyThread = copyThread;
+    }
+
+    public long getPositionTimeStamp() {
+        return positionTimeStamp;
+    }
+
+    public void setPositionTimeStamp(long positionTimeStamp) {
+        this.positionTimeStamp = positionTimeStamp;
+    }
+
+    public String getInterestDB() {
+        return interestDB;
+    }
+
+    public void setInterestDB(String interestDB) {
+        this.interestDB = interestDB;
+    }
+
+    public String getInterestCollection() {
+        return interestCollection;
+    }
+
+    public void setInterestCollection(String interestCollection) {
+        this.interestCollection = interestCollection;
+    }
+
+    public String getMongoAddr() {
+        return mongoAddr;
+    }
+
+    public void setMongoAddr(String mongoAddr) {
+        this.mongoAddr = mongoAddr;
+    }
+
+    public int getMongoPort() {
+        return mongoPort;
+    }
+
+    public void setMongoPort(int mongoPort) {
+        this.mongoPort = mongoPort;
+    }
+
+    public String getMongoUserName() {
+        return mongoUserName;
+    }
+
+    public void setMongoUserName(String mongoUserName) {
+        this.mongoUserName = mongoUserName;
+    }
+
+    public String getMongoPassWord() {
+        return mongoPassWord;
+    }
+
+    public void setMongoPassWord(String mongoPassWord) {
+        this.mongoPassWord = mongoPassWord;
+    }
+
+
+    public String getDataSync() {
+        return dataSync;
+    }
+
+    public void setDataSync(String dataSync) {
+        this.dataSync = dataSync;
+    }
+
+
+    public void load(KeyValue props) {
+
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
new file mode 100644
index 0000000..9e659c9
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -0,0 +1,61 @@
+package org.apache.connect.mongo.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MongoSourceConnector extends SourceConnector {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+    private KeyValue keyValueConfig;
+
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+        for (String requestKey : MongoReplicatorConfig.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
+                return "Request config key: " + requestKey;
+            }
+        }
+        this.keyValueConfig = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+        logger.info("start mongo source connector:{}", keyValueConfig);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MongoSourceTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+        List<KeyValue> config = new ArrayList<>();
+        config.add(this.keyValueConfig);
+        return config;
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
new file mode 100644
index 0000000..e116cfb
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -0,0 +1,148 @@
+package org.apache.connect.mongo.connector;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class MongoSourceTask extends SourceTask {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private MongoReplicator mongoReplicator;
+
+    private MongoReplicatorConfig replicatorConfig;
+
+    private String mongoSource;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+        List<SourceDataEntry> res = new ArrayList<>();
+        ReplicationEvent event = mongoReplicator.getQueue().poll();
+        if (event == null) {
+            return new ArrayList<>();
+        }
+
+        JSONObject position = position(event);
+        Schema schema = new Schema();
+        schema.setDataSource(event.getDatabaseName());
+        schema.setName(event.getCollectionName());
+        schema.setFields(new ArrayList<>());
+        buildFieleds(schema);
+        DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+        dataEntryBuilder.timestamp(System.currentTimeMillis())
+                .queue(event.getNamespace())
+                .entryType(event.getEntryType());
+
+        if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
+            dataEntryBuilder.putFiled(Constants.CREATED, event.getDocument().toJson());
+            dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
+        } else {
+            dataEntryBuilder.putFiled(Constants.OPERATIONTYPE, event.getOperationType().name());
+            dataEntryBuilder.putFiled(Constants.TIMESTAMP, event.getTimestamp().getValue());
+            dataEntryBuilder.putFiled(Constants.VERSION, event.getV());
+            dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
+            dataEntryBuilder.putFiled(Constants.PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
+            dataEntryBuilder.putFiled(Constants.OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+        }
+        SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                ByteBuffer.wrap(mongoSource.getBytes(StandardCharsets.UTF_8)),
+                ByteBuffer.wrap(position.toJSONString().getBytes(StandardCharsets.UTF_8)));
+        res.add(sourceDataEntry);
+        return res;
+    }
+
+    @Override
+    public void start(KeyValue config) {
+        try {
+            replicatorConfig = new MongoReplicatorConfig();
+            replicatorConfig.load(config);
+            mongoReplicator = new MongoReplicator(replicatorConfig);
+            mongoSource = new StringBuilder()
+                    .append(replicatorConfig.getMongoAddr())
+                    .append(replicatorConfig.getMongoPort()).toString();
+            ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
+                    mongoSource.getBytes()));
+
+            if (position != null && position.array().length > 0) {
+                String positionJson = new String(position.array(), StandardCharsets.UTF_8);
+                JSONObject jsonObject = JSONObject.parseObject(positionJson);
+                replicatorConfig.setPositionTimeStamp(jsonObject.getLongValue("timeStamp"));
+                replicatorConfig.setPositionInc(jsonObject.getIntValue("inc"));
+            } else {
+                replicatorConfig.setDataSync(Constants.INITIAL);
+            }
+            mongoReplicator.start();
+        }catch (Throwable throwable) {
+            logger.info("task start error", throwable);
+        }finally {
+            stop();
+        }
+
+
+    }
+
+    @Override
+    public void stop() {
+        logger.info("shut down.....");
+        mongoReplicator.shutdown();
+    }
+
+    @Override
+    public void pause() {
+        mongoReplicator.pause();
+    }
+
+    @Override
+    public void resume() {
+        mongoReplicator.resume();
+    }
+
+    private void buildFieleds(Schema schema) {
+        Field op = new Field(0, Constants.OPERATIONTYPE, FieldType.STRING);
+        schema.getFields().add(op);
+        Field time = new Field(1, Constants.TIMESTAMP, FieldType.INT64);
+        schema.getFields().add(time);
+        Field v = new Field(2, Constants.VERSION, FieldType.INT32);
+        schema.getFields().add(v);
+        Field namespace = new Field(3, Constants.NAMESPACE, FieldType.STRING);
+        schema.getFields().add(namespace);
+        Field operation = new Field(4, Constants.CREATED, FieldType.STRING);
+        schema.getFields().add(operation);
+        Field patch = new Field(5, Constants.PATCH, FieldType.STRING);
+        schema.getFields().add(patch);
+        Field objectId = new Field(5, Constants.OBJECTID, FieldType.STRING);
+        schema.getFields().add(objectId);
+    }
+
+    private JSONObject position(ReplicationEvent event) {
+        JSONObject jsonObject = new JSONObject();
+        switch (event.getOperationType()) {
+            case CREATED:
+                jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
+                jsonObject.put(Constants.POSITION_INC, 0);
+                jsonObject.put(Constants.INITSYNC, true);
+                break;
+            default:
+                jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
+                jsonObject.put(Constants.POSITION_INC, 0);
+                jsonObject.put(Constants.INITSYNC, true);
+                break;
+        }
+        return jsonObject;
+
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
new file mode 100644
index 0000000..018418c
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
@@ -0,0 +1,41 @@
+package org.apache.connect.mongo.initsync;
+
+public class CollectionMeta {
+
+    private String databaseName;
+    private String collectionName;
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public void setDatabaseName(String databaseName) {
+        this.databaseName = databaseName;
+    }
+
+    public String getCollectionName() {
+        return collectionName;
+    }
+
+    public void setCollectionName(String collectionName) {
+        this.collectionName = collectionName;
+    }
+
+    public CollectionMeta(String databaseName, String collectionName) {
+
+        this.databaseName = databaseName;
+        this.collectionName = collectionName;
+    }
+
+    public String getNameSpace() {
+        return databaseName + "." + collectionName;
+    }
+
+    @Override
+    public String toString() {
+        return "CollectionMeta{" +
+                "databaseName='" + databaseName + '\'' +
+                ", collectionName='" + collectionName + '\'' +
+                '}';
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
new file mode 100644
index 0000000..4b3a238
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -0,0 +1,146 @@
+package org.apache.connect.mongo.initsync;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoIterable;
+import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InitSync {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private MongoReplicatorConfig mongoReplicatorConfig;
+    private ExecutorService copyExecutor;
+    private MongoClient mongoClient;
+    private Filter filter;
+    private int copyThreadCount;
+    private Set<String> interestDataBases;
+    private Set<CollectionMeta> interestCollections;
+    private CountDownLatch countDownLatch;
+    private MongoReplicator mongoReplicator;
+
+    public InitSync(MongoReplicatorConfig mongoReplicatorConfig, MongoClient mongoClient, Filter filter, MongoReplicator mongoReplicator) {
+        this.mongoReplicatorConfig = mongoReplicatorConfig;
+        this.mongoClient = mongoClient;
+        this.filter = filter;
+        this.mongoReplicator = mongoReplicator;
+        init();
+    }
+
+    public void start() {
+        for (CollectionMeta collectionMeta : interestCollections) {
+            copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, mongoReplicator));
+        }
+        try {
+            countDownLatch.await();
+        } catch (Exception e) {
+        } finally {
+            copyExecutor.shutdown();
+        }
+    }
+
+    private void init() {
+        interestDataBases = getInterestDataBase();
+        interestCollections = getInterestCollection(interestDataBases);
+        copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread());
+        copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
+
+            AtomicInteger threads = new AtomicInteger();
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "copy_collection_thread_" + threads.incrementAndGet());
+            }
+        });
+        countDownLatch = new CountDownLatch(interestCollections.size());
+    }
+
+    private Set<CollectionMeta> getInterestCollection(Set<String> interestDataBases) {
+        Set<CollectionMeta> res = new HashSet<>();
+        for (String interestDataBase : interestDataBases) {
+            MongoIterable<String> collectionNames = mongoClient.getDatabase(interestDataBase).listCollectionNames();
+            MongoCursor<String> iterator = collectionNames.iterator();
+            while (iterator.hasNext()) {
+                String collectionName = iterator.next();
+                if (filter.filterCollectionName(collectionName)) {
+                    CollectionMeta collectionMeta = new CollectionMeta(interestDataBase, collectionName);
+                    res.add(collectionMeta);
+                }
+            }
+        }
+
+        return res;
+
+    }
+
+    private Set<String> getInterestDataBase() {
+        Set<String> res = new HashSet<>();
+        MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
+        MongoCursor<String> iterator = databaseNames.iterator();
+        while (iterator.hasNext()) {
+            String dataBaseName = iterator.next();
+            if (filter.filterDatabaseName(dataBaseName)) {
+                res.add(dataBaseName);
+            }
+        }
+
+        return res;
+    }
+
+    class CopyRunner implements Runnable {
+
+        private MongoClient mongoClient;
+        private CountDownLatch countDownLatch;
+        private CollectionMeta collectionMeta;
+        private MongoReplicator mongoReplicator;
+
+        public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, MongoReplicator mongoReplicator) {
+            this.mongoClient = mongoClient;
+            this.countDownLatch = countDownLatch;
+            this.collectionMeta = collectionMeta;
+            this.mongoReplicator = mongoReplicator;
+        }
+
+        @Override
+        public void run() {
+
+            try {
+
+                MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName())
+                        .getCollection(collectionMeta.getCollectionName())
+                        .find()
+                        .batchSize(200)
+                        .iterator();
+
+                while (mongoReplicator.isRuning() && mongoCursor.hasNext()) {
+                    Document document = mongoCursor.next();
+                    ReplicationEvent event = DocumentConvertEvent.convert(document);
+                    event.setOperationType(OperationType.CREATED);
+                    event.setNamespace(collectionMeta.getNameSpace());
+                    mongoReplicator.publishEvent(event);
+                }
+            } finally {
+                countDownLatch.countDown();
+            }
+            logger.info("database:{}, collection:{}, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName());
+        }
+    }
+
+}
+
+
+
+
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
new file mode 100644
index 0000000..05dec54
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -0,0 +1,74 @@
+package org.apache.connect.mongo.replicator;
+
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+
+import java.util.function.Function;
+
+public class Filter {
+
+    private Function<String, Boolean> dbFilter;
+    private Function<String, Boolean> collectionFilter;
+    private Function<OperationType, Boolean> noopFilter;
+
+
+    public Filter(MongoReplicatorConfig mongoReplicatorConfig) {
+        if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestDB())) {
+            dbFilter = (dataBaseName) -> {
+                if (StringUtils.isBlank(dataBaseName)) {
+                    return true;
+                }
+                String interestDB = mongoReplicatorConfig.getInterestDB();
+                String[] db = StringUtils.split(interestDB, ",");
+                if (ArrayUtils.contains(db, dataBaseName)) {
+                    return true;
+                }
+
+                return false;
+            };
+        } else {
+            dbFilter = (dataBaseName) -> true;
+        }
+
+
+        if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestCollection())) {
+            collectionFilter = (collectionName) -> {
+                if (StringUtils.isBlank(collectionName)) {
+                    return true;
+                }
+
+                String interestCollection = mongoReplicatorConfig.getInterestCollection();
+                String[] coll = StringUtils.split(interestCollection, ",");
+                if (ArrayUtils.contains(coll, collectionName)) {
+                    return true;
+                }
+                return false;
+            };
+        } else {
+            collectionFilter = (collectionName) -> true;
+        }
+
+        noopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
+    }
+
+
+    public boolean filterDatabaseName(String dataBaseName) {
+        return dbFilter.apply(dataBaseName);
+    }
+
+
+    public boolean filterCollectionName(String collectionName) {
+        return collectionFilter.apply(collectionName);
+    }
+
+    public boolean filterEvent(ReplicationEvent event) {
+        return dbFilter.apply(event.getDatabaseName())
+                && collectionFilter.apply(event.getCollectionName())
+                && noopFilter.apply(event.getOperationType());
+    }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
new file mode 100644
index 0000000..326ba03
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
@@ -0,0 +1,152 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.*;
+import com.mongodb.client.*;
+import com.mongodb.client.MongoClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+
+public class MongoReplicator {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private AtomicBoolean running = new AtomicBoolean();
+
+    private MongoReplicatorConfig mongoReplicatorConfig;
+
+    private MongoClientSettings clientSettings;
+
+    private ConnectionString connectionString;
+
+    private MongoClient mongoClient;
+
+    private BlockingQueue<ReplicationEvent> queue = new LinkedBlockingQueue<>();
+
+    private Filter filter;
+
+    private ExecutorService executorService;
+
+    private volatile boolean pause = false;
+
+    public MongoReplicator(MongoReplicatorConfig mongoReplicatorConfig) {
+        this.mongoReplicatorConfig = mongoReplicatorConfig;
+        this.filter = new Filter(mongoReplicatorConfig);
+        this.executorService = Executors.newSingleThreadExecutor((r) ->new Thread(r, "real_time_replica_thread"));
+
+        buildConnectionString();
+    }
+
+    public void start() {
+
+        try {
+            if (!running.compareAndSet(false, true)) {
+                logger.info("the java mongo replica already start");
+                return;
+            }
+
+            this.clientSettings = MongoClientSettings.builder().applicationName(APPLICATION_NAME)
+                    .applyConnectionString(connectionString)
+                    .retryWrites(true)
+                    .build();
+            this.mongoClient = MongoClients.create(clientSettings);
+            this.isReplicaMongo();
+            executorService.submit(new ReplicatorTask(this, mongoClient, mongoReplicatorConfig, filter));
+        }catch (Exception e) {
+            logger.info("start replicator error", e);
+        }finally {
+            shutdown();
+        }
+
+    }
+
+
+    private void buildConnectionString() {
+        checkConfig();
+        StringBuilder sb = new StringBuilder();
+        sb.append("mongodb://");
+        if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName())
+                && StringUtils.isNotBlank(mongoReplicatorConfig.getMongoPassWord())) {
+            sb.append(mongoReplicatorConfig.getMongoUserName());
+            sb.append(":");
+            sb.append(mongoReplicatorConfig.getMongoPassWord());
+            sb.append("@");
+
+        }
+        sb.append(mongoReplicatorConfig.getMongoAddr());
+        sb.append(":");
+        sb.append(mongoReplicatorConfig.getMongoPort());
+
+        this.connectionString = new ConnectionString(sb.toString());
+    }
+
+    private void checkConfig() {
+        Validate.notBlank(mongoReplicatorConfig.getMongoAddr(), "mongo url is blank");
+        Validate.isTrue(mongoReplicatorConfig.getMongoPort() > 0 && mongoReplicatorConfig.getMongoPort() < 65535, "mongo port should >0 and <65535");
+
+    }
+
+    private boolean isReplicaMongo() {
+        MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
+        MongoIterable<String> collectionNames = local.listCollectionNames();
+        for (String collectionName : collectionNames) {
+            if (MONGO_OPLOG_RS.equals(collectionName)) {
+                return true;
+            }
+        }
+        this.shutdown();
+        throw new IllegalStateException(String.format("url:%s, port:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getMongoPort()));
+    }
+
+    public void shutdown() {
+        if (running.compareAndSet(true, false)) {
+            if (!this.executorService.isShutdown()) {
+                executorService.shutdown();
+            }
+            if (this.mongoClient != null) {
+                this.mongoClient.close();
+            }
+        }
+
+    }
+
+    public void publishEvent(ReplicationEvent replicationEvent) {
+        while (true) {
+            try {
+                queue.put(replicationEvent);
+                break;
+            } catch (Exception e) {
+            }
+        }
+    }
+
+
+    public void pause() {
+        pause = true;
+    }
+
+    public void resume() {
+        pause = false;
+    }
+
+    public boolean isPause() {
+        return pause;
+    }
+
+    public boolean isRuning() {
+        return running.get();
+    }
+
+    public BlockingQueue<ReplicationEvent> getQueue() {
+        return queue;
+    }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
new file mode 100644
index 0000000..9e4c67b
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -0,0 +1,84 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.CursorType;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.connect.mongo.initsync.InitSync;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ReplicatorTask implements Runnable {
+
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private MongoReplicator mongoReplicator;
+
+    private MongoClient mongoClient;
+
+    private MongoReplicatorConfig mongoReplicatorConfig;
+
+    private Filter filter;
+
+    public ReplicatorTask(MongoReplicator mongoReplicator, MongoClient mongoClient, MongoReplicatorConfig mongoReplicatorConfig, Filter filter) {
+        this.mongoReplicator = mongoReplicator;
+        this.mongoReplicatorConfig = mongoReplicatorConfig;
+        this.mongoClient = mongoClient;
+        this.filter = filter;
+    }
+
+    @Override
+    public void run() {
+
+        if (Constants.INITIAL.equals(mongoReplicatorConfig.getDataSync())) {
+            InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator);
+            initSync.start();
+        }
+
+        MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+        FindIterable<Document> iterable;
+        if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) {
+            iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
+                    Filters.gt("ts", mongoReplicatorConfig.getPositionTimeStamp()));
+        } else {
+            iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+        }
+        MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
+                .noCursorTimeout(true)
+                .cursorType(CursorType.TailableAwait)
+                .batchSize(200)
+                .iterator();
+
+        while (mongoReplicator.isRuning()) {
+            try {
+                executorCursor(cursor);
+                Thread.sleep(100);
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                logger.error("mongoReplicator shutdown.....");
+                mongoReplicator.shutdown();
+            }
+        }
+    }
+
+
+    private void executorCursor(MongoCursor<Document> cursor) {
+        while (cursor.hasNext() && !mongoReplicator.isPause()) {
+            Document document = cursor.next();
+            ReplicationEvent event = DocumentConvertEvent.convert(document);
+            if (filter.filterEvent(event)) {
+                mongoReplicator.publishEvent(event);
+            }
+        }
+    }
+
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
new file mode 100644
index 0000000..24f36d9
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -0,0 +1,67 @@
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FilterTest {
+
+
+    private MongoReplicatorConfig config;
+
+
+    @Before
+    public void init() {
+        config = new MongoReplicatorConfig();
+
+    }
+
+    @Test
+    public void testSpecialDb() {
+        config.setInterestDB("test,admin");
+        Filter filter = new Filter(config);
+        Assert.assertTrue(filter.filterDatabaseName("test"));
+        Assert.assertFalse(filter.filterDatabaseName("test01"));
+    }
+
+
+    @Test
+    public void testBlankDb() {
+        Filter filter = new Filter(config);
+        Assert.assertTrue(filter.filterDatabaseName("test"));
+        Assert.assertTrue(filter.filterDatabaseName("test01"));
+    }
+
+
+    @Test
+    public void testSpecialCollection() {
+        config.setInterestCollection("test,admin");
+        Filter filter = new Filter(config);
+        Assert.assertTrue(filter.filterCollectionName("test"));
+        Assert.assertFalse(filter.filterCollectionName("test01"));
+    }
+
+
+    @Test
+    public void testBlankCollection() {
+        Filter filter = new Filter(config);
+        Assert.assertTrue(filter.filterCollectionName("test"));
+        Assert.assertTrue(filter.filterCollectionName("test01"));
+    }
+
+
+    @Test
+    public void testFilterEvent() {
+        Filter filter = new Filter(config);
+        ReplicationEvent replicationEvent = new ReplicationEvent();
+        replicationEvent.setOperationType(OperationType.NOOP);
+        Assert.assertFalse(filter.filterEvent(replicationEvent));
+        replicationEvent.setOperationType(OperationType.DBCOMMAND);
+        Assert.assertTrue(filter.filterEvent(replicationEvent));
+    }
+
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
new file mode 100644
index 0000000..04e1374
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -0,0 +1,45 @@
+package org.apache.connect.mongo;
+
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.connector.MongoSourceConnector;
+import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MongoSourceConnectorTest {
+
+    private MongoSourceConnector mongoSourceConnector;
+    private DefaultKeyValue keyValue;
+
+    @Before
+    public void before() {
+        mongoSourceConnector = new MongoSourceConnector();
+        keyValue = new DefaultKeyValue();
+    }
+
+    @Test
+    public void takeClass() {
+        Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class);
+    }
+
+
+    @Test
+    public void verifyConfig() {
+        keyValue.put("mongoAddr", "127.0.0.1");
+        String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
+        Assert.assertTrue(s.contains("Request config key:"));
+        keyValue.put("mongoPort", 27017);
+        s = mongoSourceConnector.verifyAndSetConfig(keyValue);
+        Assert.assertTrue(StringUtils.isBlank(s));
+    }
+
+
+
+
+
+
+
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
new file mode 100644
index 0000000..b0319f1
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
@@ -0,0 +1,33 @@
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicatorTest {
+
+    private MongoReplicatorConfig config;
+
+    @Before
+    public void before() {
+        config = new MongoReplicatorConfig();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNoPort() {
+        config.setMongoAddr("127.0.0.1");
+        MongoReplicator mongoReplicator = new MongoReplicator(config);
+        mongoReplicator.start();
+    }
+
+
+    @Test
+    public void testNoPort1() {
+        config.setMongoAddr("127.0.0.1");
+        config.setMongoPort(27012);
+        MongoReplicator mongoReplicator = new MongoReplicator(config);
+        mongoReplicator.start();
+    }
+
+
+}