You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:24 UTC
[rocketmq-connect] 01/13: add junit test and modify some code
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f629088918a2cc725326b981749798c0ba4b64de
Author: 李平 <lp...@alibaba-inc.com>
AuthorDate: Mon Aug 5 15:01:55 2019 +0800
add junit test and modify some code
---
pom.xml | 180 +++++++++++++++++++++
.../connect/mongo/MongoReplicatorConfig.java | 157 ++++++++++++++++++
.../mongo/connector/MongoSourceConnector.java | 61 +++++++
.../connect/mongo/connector/MongoSourceTask.java | 148 +++++++++++++++++
.../connect/mongo/initsync/CollectionMeta.java | 41 +++++
.../apache/connect/mongo/initsync/InitSync.java | 146 +++++++++++++++++
.../apache/connect/mongo/replicator/Filter.java | 74 +++++++++
.../connect/mongo/replicator/MongoReplicator.java | 152 +++++++++++++++++
.../connect/mongo/replicator/ReplicatorTask.java | 84 ++++++++++
.../java/org/apache/connect/mongo/FilterTest.java | 67 ++++++++
.../connect/mongo/MongoSourceConnectorTest.java | 45 ++++++
.../org/apache/connect/mongo/ReplicatorTest.java | 33 ++++
12 files changed, 1188 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..7ea7ab1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,180 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-mongo</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>connect-mongo</name>
+ <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-mongo</url>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ </plugins>
+ </build>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver</artifactId>
+ <version>3.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.26</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.0-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>1.0.0-alpha</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>4.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
new file mode 100644
index 0000000..18a834f
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java
@@ -0,0 +1,157 @@
+package org.apache.connect.mongo;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.Set;
+
+public class MongoReplicatorConfig {
+
+ private String mongoAddr;
+ private int mongoPort;
+ private String mongoUserName;
+ private String mongoPassWord;
+ private String interestDB;
+ private String interestCollection;
+ private long positionTimeStamp;
+ private int positionInc;
+ private String dataSync;
+ private int copyThread = Runtime.getRuntime().availableProcessors();
+
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
+ {
+ add("mongoAddr");
+ add("mongoPort");
+ }
+ };
+
+ public int getPositionInc() {
+ return positionInc;
+ }
+
+ public void setPositionInc(int positionInc) {
+ this.positionInc = positionInc;
+ }
+
+ public int getCopyThread() {
+ return copyThread;
+ }
+
+ public void setCopyThread(int copyThread) {
+ this.copyThread = copyThread;
+ }
+
+ public long getPositionTimeStamp() {
+ return positionTimeStamp;
+ }
+
+ public void setPositionTimeStamp(long positionTimeStamp) {
+ this.positionTimeStamp = positionTimeStamp;
+ }
+
+ public String getInterestDB() {
+ return interestDB;
+ }
+
+ public void setInterestDB(String interestDB) {
+ this.interestDB = interestDB;
+ }
+
+ public String getInterestCollection() {
+ return interestCollection;
+ }
+
+ public void setInterestCollection(String interestCollection) {
+ this.interestCollection = interestCollection;
+ }
+
+ public String getMongoAddr() {
+ return mongoAddr;
+ }
+
+ public void setMongoAddr(String mongoAddr) {
+ this.mongoAddr = mongoAddr;
+ }
+
+ public int getMongoPort() {
+ return mongoPort;
+ }
+
+ public void setMongoPort(int mongoPort) {
+ this.mongoPort = mongoPort;
+ }
+
+ public String getMongoUserName() {
+ return mongoUserName;
+ }
+
+ public void setMongoUserName(String mongoUserName) {
+ this.mongoUserName = mongoUserName;
+ }
+
+ public String getMongoPassWord() {
+ return mongoPassWord;
+ }
+
+ public void setMongoPassWord(String mongoPassWord) {
+ this.mongoPassWord = mongoPassWord;
+ }
+
+
+ public String getDataSync() {
+ return dataSync;
+ }
+
+ public void setDataSync(String dataSync) {
+ this.dataSync = dataSync;
+ }
+
+
+ public void load(KeyValue props) {
+
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
new file mode 100644
index 0000000..9e659c9
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -0,0 +1,61 @@
+package org.apache.connect.mongo.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MongoSourceConnector extends SourceConnector {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+ private KeyValue keyValueConfig;
+
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+ for (String requestKey : MongoReplicatorConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ return "Request config key: " + requestKey;
+ }
+ }
+ this.keyValueConfig = config;
+ return "";
+ }
+
+ @Override
+ public void start() {
+ logger.info("start mongo source connector:{}", keyValueConfig);
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return MongoSourceTask.class;
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs() {
+ List<KeyValue> config = new ArrayList<>();
+ config.add(this.keyValueConfig);
+ return config;
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
new file mode 100644
index 0000000..e116cfb
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -0,0 +1,148 @@
+package org.apache.connect.mongo.connector;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class MongoSourceTask extends SourceTask {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private MongoReplicator mongoReplicator;
+
+ private MongoReplicatorConfig replicatorConfig;
+
+ private String mongoSource;
+
+ @Override
+ public Collection<SourceDataEntry> poll() {
+ List<SourceDataEntry> res = new ArrayList<>();
+ ReplicationEvent event = mongoReplicator.getQueue().poll();
+ if (event == null) {
+ return new ArrayList<>();
+ }
+
+ JSONObject position = position(event);
+ Schema schema = new Schema();
+ schema.setDataSource(event.getDatabaseName());
+ schema.setName(event.getCollectionName());
+ schema.setFields(new ArrayList<>());
+ buildFieleds(schema);
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(event.getNamespace())
+ .entryType(event.getEntryType());
+
+ if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) {
+ dataEntryBuilder.putFiled(Constants.CREATED, event.getDocument().toJson());
+ dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
+ } else {
+ dataEntryBuilder.putFiled(Constants.OPERATIONTYPE, event.getOperationType().name());
+ dataEntryBuilder.putFiled(Constants.TIMESTAMP, event.getTimestamp().getValue());
+ dataEntryBuilder.putFiled(Constants.VERSION, event.getV());
+ dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace());
+ dataEntryBuilder.putFiled(Constants.PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
+ dataEntryBuilder.putFiled(Constants.OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+ }
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(mongoSource.getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(position.toJSONString().getBytes(StandardCharsets.UTF_8)));
+ res.add(sourceDataEntry);
+ return res;
+ }
+
+ @Override
+ public void start(KeyValue config) {
+ try {
+ replicatorConfig = new MongoReplicatorConfig();
+ replicatorConfig.load(config);
+ mongoReplicator = new MongoReplicator(replicatorConfig);
+ mongoSource = new StringBuilder()
+ .append(replicatorConfig.getMongoAddr())
+ .append(replicatorConfig.getMongoPort()).toString();
+ ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
+ mongoSource.getBytes()));
+
+ if (position != null && position.array().length > 0) {
+ String positionJson = new String(position.array(), StandardCharsets.UTF_8);
+ JSONObject jsonObject = JSONObject.parseObject(positionJson);
+ replicatorConfig.setPositionTimeStamp(jsonObject.getLongValue("timeStamp"));
+ replicatorConfig.setPositionInc(jsonObject.getIntValue("inc"));
+ } else {
+ replicatorConfig.setDataSync(Constants.INITIAL);
+ }
+ mongoReplicator.start();
+ }catch (Throwable throwable) {
+ logger.info("task start error", throwable);
+ }finally {
+ stop();
+ }
+
+
+ }
+
+ @Override
+ public void stop() {
+ logger.info("shut down.....");
+ mongoReplicator.shutdown();
+ }
+
+ @Override
+ public void pause() {
+ mongoReplicator.pause();
+ }
+
+ @Override
+ public void resume() {
+ mongoReplicator.resume();
+ }
+
+ private void buildFieleds(Schema schema) {
+ Field op = new Field(0, Constants.OPERATIONTYPE, FieldType.STRING);
+ schema.getFields().add(op);
+ Field time = new Field(1, Constants.TIMESTAMP, FieldType.INT64);
+ schema.getFields().add(time);
+ Field v = new Field(2, Constants.VERSION, FieldType.INT32);
+ schema.getFields().add(v);
+ Field namespace = new Field(3, Constants.NAMESPACE, FieldType.STRING);
+ schema.getFields().add(namespace);
+ Field operation = new Field(4, Constants.CREATED, FieldType.STRING);
+ schema.getFields().add(operation);
+ Field patch = new Field(5, Constants.PATCH, FieldType.STRING);
+ schema.getFields().add(patch);
+ Field objectId = new Field(5, Constants.OBJECTID, FieldType.STRING);
+ schema.getFields().add(objectId);
+ }
+
+ private JSONObject position(ReplicationEvent event) {
+ JSONObject jsonObject = new JSONObject();
+ switch (event.getOperationType()) {
+ case CREATED:
+ jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
+ jsonObject.put(Constants.POSITION_INC, 0);
+ jsonObject.put(Constants.INITSYNC, true);
+ break;
+ default:
+ jsonObject.put(Constants.POSITION_TIMESTAMP, 0);
+ jsonObject.put(Constants.POSITION_INC, 0);
+ jsonObject.put(Constants.INITSYNC, true);
+ break;
+ }
+ return jsonObject;
+
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
new file mode 100644
index 0000000..018418c
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
@@ -0,0 +1,41 @@
+package org.apache.connect.mongo.initsync;
+
+public class CollectionMeta {
+
+ private String databaseName;
+ private String collectionName;
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public void setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ }
+
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ public void setCollectionName(String collectionName) {
+ this.collectionName = collectionName;
+ }
+
+ public CollectionMeta(String databaseName, String collectionName) {
+
+ this.databaseName = databaseName;
+ this.collectionName = collectionName;
+ }
+
+ public String getNameSpace() {
+ return databaseName + "." + collectionName;
+ }
+
+ @Override
+ public String toString() {
+ return "CollectionMeta{" +
+ "databaseName='" + databaseName + '\'' +
+ ", collectionName='" + collectionName + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
new file mode 100644
index 0000000..4b3a238
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -0,0 +1,146 @@
+package org.apache.connect.mongo.initsync;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoIterable;
+import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InitSync {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private MongoReplicatorConfig mongoReplicatorConfig;
+ private ExecutorService copyExecutor;
+ private MongoClient mongoClient;
+ private Filter filter;
+ private int copyThreadCount;
+ private Set<String> interestDataBases;
+ private Set<CollectionMeta> interestCollections;
+ private CountDownLatch countDownLatch;
+ private MongoReplicator mongoReplicator;
+
+ public InitSync(MongoReplicatorConfig mongoReplicatorConfig, MongoClient mongoClient, Filter filter, MongoReplicator mongoReplicator) {
+ this.mongoReplicatorConfig = mongoReplicatorConfig;
+ this.mongoClient = mongoClient;
+ this.filter = filter;
+ this.mongoReplicator = mongoReplicator;
+ init();
+ }
+
+ public void start() {
+ for (CollectionMeta collectionMeta : interestCollections) {
+ copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, mongoReplicator));
+ }
+ try {
+ countDownLatch.await();
+ } catch (Exception e) {
+ } finally {
+ copyExecutor.shutdown();
+ }
+ }
+
+ private void init() {
+ interestDataBases = getInterestDataBase();
+ interestCollections = getInterestCollection(interestDataBases);
+ copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread());
+ copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
+
+ AtomicInteger threads = new AtomicInteger();
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "copy_collection_thread_" + threads.incrementAndGet());
+ }
+ });
+ countDownLatch = new CountDownLatch(interestCollections.size());
+ }
+
+ private Set<CollectionMeta> getInterestCollection(Set<String> interestDataBases) {
+ Set<CollectionMeta> res = new HashSet<>();
+ for (String interestDataBase : interestDataBases) {
+ MongoIterable<String> collectionNames = mongoClient.getDatabase(interestDataBase).listCollectionNames();
+ MongoCursor<String> iterator = collectionNames.iterator();
+ while (iterator.hasNext()) {
+ String collectionName = iterator.next();
+ if (filter.filterCollectionName(collectionName)) {
+ CollectionMeta collectionMeta = new CollectionMeta(interestDataBase, collectionName);
+ res.add(collectionMeta);
+ }
+ }
+ }
+
+ return res;
+
+ }
+
+ private Set<String> getInterestDataBase() {
+ Set<String> res = new HashSet<>();
+ MongoIterable<String> databaseNames = mongoClient.listDatabaseNames();
+ MongoCursor<String> iterator = databaseNames.iterator();
+ while (iterator.hasNext()) {
+ String dataBaseName = iterator.next();
+ if (filter.filterDatabaseName(dataBaseName)) {
+ res.add(dataBaseName);
+ }
+ }
+
+ return res;
+ }
+
+ class CopyRunner implements Runnable {
+
+ private MongoClient mongoClient;
+ private CountDownLatch countDownLatch;
+ private CollectionMeta collectionMeta;
+ private MongoReplicator mongoReplicator;
+
+ public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, MongoReplicator mongoReplicator) {
+ this.mongoClient = mongoClient;
+ this.countDownLatch = countDownLatch;
+ this.collectionMeta = collectionMeta;
+ this.mongoReplicator = mongoReplicator;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+
+ MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName())
+ .getCollection(collectionMeta.getCollectionName())
+ .find()
+ .batchSize(200)
+ .iterator();
+
+ while (mongoReplicator.isRuning() && mongoCursor.hasNext()) {
+ Document document = mongoCursor.next();
+ ReplicationEvent event = DocumentConvertEvent.convert(document);
+ event.setOperationType(OperationType.CREATED);
+ event.setNamespace(collectionMeta.getNameSpace());
+ mongoReplicator.publishEvent(event);
+ }
+ } finally {
+ countDownLatch.countDown();
+ }
+ logger.info("database:{}, collection:{}, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName());
+ }
+ }
+
+}
+
+
+
+
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
new file mode 100644
index 0000000..05dec54
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -0,0 +1,74 @@
+package org.apache.connect.mongo.replicator;
+
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+
+import java.util.function.Function;
+
+public class Filter {
+
+ private Function<String, Boolean> dbFilter;
+ private Function<String, Boolean> collectionFilter;
+ private Function<OperationType, Boolean> noopFilter;
+
+
+ public Filter(MongoReplicatorConfig mongoReplicatorConfig) {
+ if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestDB())) {
+ dbFilter = (dataBaseName) -> {
+ if (StringUtils.isBlank(dataBaseName)) {
+ return true;
+ }
+ String interestDB = mongoReplicatorConfig.getInterestDB();
+ String[] db = StringUtils.split(interestDB, ",");
+ if (ArrayUtils.contains(db, dataBaseName)) {
+ return true;
+ }
+
+ return false;
+ };
+ } else {
+ dbFilter = (dataBaseName) -> true;
+ }
+
+
+ if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestCollection())) {
+ collectionFilter = (collectionName) -> {
+ if (StringUtils.isBlank(collectionName)) {
+ return true;
+ }
+
+ String interestCollection = mongoReplicatorConfig.getInterestCollection();
+ String[] coll = StringUtils.split(interestCollection, ",");
+ if (ArrayUtils.contains(coll, collectionName)) {
+ return true;
+ }
+ return false;
+ };
+ } else {
+ collectionFilter = (collectionName) -> true;
+ }
+
+ noopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
+ }
+
+
+ public boolean filterDatabaseName(String dataBaseName) {
+ return dbFilter.apply(dataBaseName);
+ }
+
+
+ public boolean filterCollectionName(String collectionName) {
+ return collectionFilter.apply(collectionName);
+ }
+
+ public boolean filterEvent(ReplicationEvent event) {
+ return dbFilter.apply(event.getDatabaseName())
+ && collectionFilter.apply(event.getCollectionName())
+ && noopFilter.apply(event.getOperationType());
+ }
+
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
new file mode 100644
index 0000000..326ba03
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java
@@ -0,0 +1,152 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.*;
+import com.mongodb.client.*;
+import com.mongodb.client.MongoClient;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+
+public class MongoReplicator {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private AtomicBoolean running = new AtomicBoolean();
+
+ private MongoReplicatorConfig mongoReplicatorConfig;
+
+ private MongoClientSettings clientSettings;
+
+ private ConnectionString connectionString;
+
+ private MongoClient mongoClient;
+
+ private BlockingQueue<ReplicationEvent> queue = new LinkedBlockingQueue<>();
+
+ private Filter filter;
+
+ private ExecutorService executorService;
+
+ private volatile boolean pause = false;
+
+ public MongoReplicator(MongoReplicatorConfig mongoReplicatorConfig) {
+ this.mongoReplicatorConfig = mongoReplicatorConfig;
+ this.filter = new Filter(mongoReplicatorConfig);
+ this.executorService = Executors.newSingleThreadExecutor((r) ->new Thread(r, "real_time_replica_thread"));
+
+ buildConnectionString();
+ }
+
+ public void start() {
+
+ try {
+ if (!running.compareAndSet(false, true)) {
+ logger.info("the java mongo replica already start");
+ return;
+ }
+
+ this.clientSettings = MongoClientSettings.builder().applicationName(APPLICATION_NAME)
+ .applyConnectionString(connectionString)
+ .retryWrites(true)
+ .build();
+ this.mongoClient = MongoClients.create(clientSettings);
+ this.isReplicaMongo();
+ executorService.submit(new ReplicatorTask(this, mongoClient, mongoReplicatorConfig, filter));
+ }catch (Exception e) {
+ logger.info("start replicator error", e);
+ }finally {
+ shutdown();
+ }
+
+ }
+
+
+ private void buildConnectionString() {
+ checkConfig();
+ StringBuilder sb = new StringBuilder();
+ sb.append("mongodb://");
+ if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName())
+ && StringUtils.isNotBlank(mongoReplicatorConfig.getMongoPassWord())) {
+ sb.append(mongoReplicatorConfig.getMongoUserName());
+ sb.append(":");
+ sb.append(mongoReplicatorConfig.getMongoPassWord());
+ sb.append("@");
+
+ }
+ sb.append(mongoReplicatorConfig.getMongoAddr());
+ sb.append(":");
+ sb.append(mongoReplicatorConfig.getMongoPort());
+
+ this.connectionString = new ConnectionString(sb.toString());
+ }
+
+ private void checkConfig() {
+ Validate.notBlank(mongoReplicatorConfig.getMongoAddr(), "mongo url is blank");
+ Validate.isTrue(mongoReplicatorConfig.getMongoPort() > 0 && mongoReplicatorConfig.getMongoPort() < 65535, "mongo port should >0 and <65535");
+
+ }
+
+ private boolean isReplicaMongo() {
+ MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
+ MongoIterable<String> collectionNames = local.listCollectionNames();
+ for (String collectionName : collectionNames) {
+ if (MONGO_OPLOG_RS.equals(collectionName)) {
+ return true;
+ }
+ }
+ this.shutdown();
+ throw new IllegalStateException(String.format("url:%s, port:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getMongoPort()));
+ }
+
+ public void shutdown() {
+ if (running.compareAndSet(true, false)) {
+ if (!this.executorService.isShutdown()) {
+ executorService.shutdown();
+ }
+ if (this.mongoClient != null) {
+ this.mongoClient.close();
+ }
+ }
+
+ }
+
+ public void publishEvent(ReplicationEvent replicationEvent) {
+ while (true) {
+ try {
+ queue.put(replicationEvent);
+ break;
+ } catch (Exception e) {
+ }
+ }
+ }
+
+
+ public void pause() {
+ pause = true;
+ }
+
+ public void resume() {
+ pause = false;
+ }
+
+ public boolean isPause() {
+ return pause;
+ }
+
+ public boolean isRuning() {
+ return running.get();
+ }
+
+ public BlockingQueue<ReplicationEvent> getQueue() {
+ return queue;
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
new file mode 100644
index 0000000..9e4c67b
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -0,0 +1,84 @@
+package org.apache.connect.mongo.replicator;
+
+import com.mongodb.CursorType;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import org.apache.connect.mongo.MongoReplicatorConfig;
+import org.apache.connect.mongo.replicator.event.DocumentConvertEvent;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.apache.connect.mongo.initsync.InitSync;
+import org.bson.Document;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ReplicatorTask implements Runnable {
+
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private MongoReplicator mongoReplicator;
+
+ private MongoClient mongoClient;
+
+ private MongoReplicatorConfig mongoReplicatorConfig;
+
+ private Filter filter;
+
+ public ReplicatorTask(MongoReplicator mongoReplicator, MongoClient mongoClient, MongoReplicatorConfig mongoReplicatorConfig, Filter filter) {
+ this.mongoReplicator = mongoReplicator;
+ this.mongoReplicatorConfig = mongoReplicatorConfig;
+ this.mongoClient = mongoClient;
+ this.filter = filter;
+ }
+
+ @Override
+ public void run() {
+
+ if (Constants.INITIAL.equals(mongoReplicatorConfig.getDataSync())) {
+ InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator);
+ initSync.start();
+ }
+
+ MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+ FindIterable<Document> iterable;
+ if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) {
+ iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
+ Filters.gt("ts", mongoReplicatorConfig.getPositionTimeStamp()));
+ } else {
+ iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+ }
+ MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
+ .noCursorTimeout(true)
+ .cursorType(CursorType.TailableAwait)
+ .batchSize(200)
+ .iterator();
+
+ while (mongoReplicator.isRuning()) {
+ try {
+ executorCursor(cursor);
+ Thread.sleep(100);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ logger.error("mongoReplicator shutdown.....");
+ mongoReplicator.shutdown();
+ }
+ }
+ }
+
+
+ private void executorCursor(MongoCursor<Document> cursor) {
+ while (cursor.hasNext() && !mongoReplicator.isPause()) {
+ Document document = cursor.next();
+ ReplicationEvent event = DocumentConvertEvent.convert(document);
+ if (filter.filterEvent(event)) {
+ mongoReplicator.publishEvent(event);
+ }
+ }
+ }
+
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
new file mode 100644
index 0000000..24f36d9
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -0,0 +1,67 @@
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.Filter;
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FilterTest {
+
+
+ private MongoReplicatorConfig config;
+
+
+ @Before
+ public void init() {
+ config = new MongoReplicatorConfig();
+
+ }
+
+ @Test
+ public void testSpecialDb() {
+ config.setInterestDB("test,admin");
+ Filter filter = new Filter(config);
+ Assert.assertTrue(filter.filterDatabaseName("test"));
+ Assert.assertFalse(filter.filterDatabaseName("test01"));
+ }
+
+
+ @Test
+ public void testBlankDb() {
+ Filter filter = new Filter(config);
+ Assert.assertTrue(filter.filterDatabaseName("test"));
+ Assert.assertTrue(filter.filterDatabaseName("test01"));
+ }
+
+
+ @Test
+ public void testSpecialCollection() {
+ config.setInterestCollection("test,admin");
+ Filter filter = new Filter(config);
+ Assert.assertTrue(filter.filterCollectionName("test"));
+ Assert.assertFalse(filter.filterCollectionName("test01"));
+ }
+
+
+ @Test
+ public void testBlankCollection() {
+ Filter filter = new Filter(config);
+ Assert.assertTrue(filter.filterCollectionName("test"));
+ Assert.assertTrue(filter.filterCollectionName("test01"));
+ }
+
+
+ @Test
+ public void testFilterEvent() {
+ Filter filter = new Filter(config);
+ ReplicationEvent replicationEvent = new ReplicationEvent();
+ replicationEvent.setOperationType(OperationType.NOOP);
+ Assert.assertFalse(filter.filterEvent(replicationEvent));
+ replicationEvent.setOperationType(OperationType.DBCOMMAND);
+ Assert.assertTrue(filter.filterEvent(replicationEvent));
+ }
+
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
new file mode 100644
index 0000000..04e1374
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -0,0 +1,45 @@
+package org.apache.connect.mongo;
+
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.connect.mongo.connector.MongoSourceConnector;
+import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MongoSourceConnectorTest {
+
+ private MongoSourceConnector mongoSourceConnector;
+ private DefaultKeyValue keyValue;
+
+ @Before
+ public void before() {
+ mongoSourceConnector = new MongoSourceConnector();
+ keyValue = new DefaultKeyValue();
+ }
+
+ @Test
+ public void takeClass() {
+ Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class);
+ }
+
+
+ @Test
+ public void verifyConfig() {
+ keyValue.put("mongoAddr", "127.0.0.1");
+ String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
+ Assert.assertTrue(s.contains("Request config key:"));
+ keyValue.put("mongoPort", 27017);
+ s = mongoSourceConnector.verifyAndSetConfig(keyValue);
+ Assert.assertTrue(StringUtils.isBlank(s));
+ }
+
+
+
+
+
+
+
+
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
new file mode 100644
index 0000000..b0319f1
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java
@@ -0,0 +1,33 @@
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.MongoReplicator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicatorTest {
+
+ private MongoReplicatorConfig config;
+
+ @Before
+ public void before() {
+ config = new MongoReplicatorConfig();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoPort() {
+ config.setMongoAddr("127.0.0.1");
+ MongoReplicator mongoReplicator = new MongoReplicator(config);
+ mongoReplicator.start();
+ }
+
+
+ @Test
+ public void testNoPort1() {
+ config.setMongoAddr("127.0.0.1");
+ config.setMongoPort(27012);
+ MongoReplicator mongoReplicator = new MongoReplicator(config);
+ mongoReplicator.start();
+ }
+
+
+}