You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/27 05:45:52 UTC
[rocketmq-connect] branch master updated: [ISSUE #312] mongo source connector adapt to new api 0.1.4 (#313)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 21f13c56 [ISSUE #312] mongo source connector adapt to new api 0.1.4 (#313)
21f13c56 is described below
commit 21f13c56c5d07d390d1a31c8a741c269316e905d
Author: Oliver <wq...@163.com>
AuthorDate: Tue Sep 27 13:45:47 2022 +0800
[ISSUE #312] mongo source connector adapt to new api 0.1.4 (#313)
---
connectors/rocketmq-connect-mongo/pom.xml | 22 ++-
.../org/apache/connect/mongo/SourceTaskConfig.java | 20 +++
.../mongo/connector/MongoSourceConnector.java | 33 ++--
.../connect/mongo/connector/MongoSourceTask.java | 49 +++---
.../mongo/connector/builder/MongoDataEntry.java | 173 ++++++++++-----------
.../apache/connect/mongo/initsync/InitSync.java | 3 -
.../apache/connect/mongo/replicator/Constants.java | 19 +++
.../mongo/replicator/MongoClientFactory.java | 15 +-
.../apache/connect/mongo/replicator/Position.java | 2 +-
.../connect/mongo/replicator/ReplicaSet.java | 19 ++-
.../connect/mongo/replicator/ReplicaSetConfig.java | 10 ++
.../mongo/replicator/ReplicaSetsContext.java | 30 ++--
.../connect/mongo/replicator/ReplicatorTask.java | 4 +-
.../mongo/replicator/event/OperationType.java | 4 +
.../mongo/replicator/event/ReplicationEvent.java | 14 --
.../org/apache/connect/mongo/MongoFactoryTest.java | 23 +--
.../connect/mongo/MongoSourceConnectorTest.java | 58 +++----
.../apache/connect/mongo/MongoSourceTaskTest.java | 86 ++++------
.../java/org/apache/connect/mongo/MongoTest.java | 47 +++---
.../apache/connect/mongo/ReplicaContextTest.java | 2 +-
.../org/apache/connect/mongo/ReplicaSetTest.java | 2 +-
.../connect/mongo/TestPositionStorageReader.java | 35 +++++
.../connect/runtime/config/WorkerConfig.java | 2 +-
23 files changed, 351 insertions(+), 321 deletions(-)
diff --git a/connectors/rocketmq-connect-mongo/pom.xml b/connectors/rocketmq-connect-mongo/pom.xml
index 2fc00d13..9f9c1be5 100644
--- a/connectors/rocketmq-connect-mongo/pom.xml
+++ b/connectors/rocketmq-connect-mongo/pom.xml
@@ -140,6 +140,24 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -148,7 +166,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
- <version>3.10.1</version>
+ <version>3.12.11</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -163,7 +181,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
- <version>0.1.1</version>
+ <version>0.1.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index d184b5ce..10f9a57c 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -47,6 +47,26 @@ public class SourceTaskConfig {
private String trustStorePassword;
private int copyThread = Runtime.getRuntime().availableProcessors();
+ private long maxConnectionIdleTime;
+
+ private boolean socketKeepAlive;
+
+ public boolean getSocketKeepAlive() {
+ return socketKeepAlive;
+ }
+
+ public void setSocketKeepAlive(boolean socketKeepAlive) {
+ this.socketKeepAlive = socketKeepAlive;
+ }
+
+ public long getMaxConnectionIdleTime() {
+ return maxConnectionIdleTime;
+ }
+
+ public void setMaxConnectionIdleTime(long maxConnectionIdleTime) {
+ this.maxConnectionIdleTime = maxConnectionIdleTime;
+ }
+
public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() {
{
add("mongoAddr");
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index 5be2e0d1..9939de4e 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -18,8 +18,8 @@
package org.apache.connect.mongo.connector;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.Task;
-import io.openmessaging.connector.api.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
import java.util.ArrayList;
import java.util.List;
import org.apache.connect.mongo.SourceTaskConfig;
@@ -32,34 +32,26 @@ public class MongoSourceConnector extends SourceConnector {
private KeyValue keyValueConfig;
@Override
- public String verifyAndSetConfig(KeyValue config) {
+ public void start(KeyValue config) {
for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
if (!config.containsKey(requestKey)) {
- return "Request config key: " + requestKey;
+ throw new RuntimeException("Request config key: " + requestKey);
}
}
this.keyValueConfig = config;
- return "";
- }
-
- @Override
- public void start() {
- logger.info("start mongo source connector:{}", keyValueConfig);
}
@Override
public void stop() {
-
+ this.keyValueConfig = null;
}
- @Override
- public void pause() {
- }
-
- @Override
- public void resume() {
+ @Override public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> config = new ArrayList<>();
+ config.add(this.keyValueConfig);
+ return config;
}
@Override
@@ -67,10 +59,5 @@ public class MongoSourceConnector extends SourceConnector {
return MongoSourceTask.class;
}
- @Override
- public List<KeyValue> taskConfigs() {
- List<KeyValue> config = new ArrayList<>();
- config.add(this.keyValueConfig);
- return config;
- }
+
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index 49bcf49a..c913a3e5 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -17,14 +17,16 @@
package org.apache.connect.mongo.connector;
-import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import io.openmessaging.connector.api.source.SourceTask;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.connect.mongo.SourceTaskConfig;
+import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSet;
import org.apache.connect.mongo.replicator.ReplicaSetManager;
@@ -43,7 +45,7 @@ public class MongoSourceTask extends SourceTask {
private ReplicaSetsContext replicaSetsContext;
@Override
- public Collection<SourceDataEntry> poll() {
+ public List<ConnectRecord> poll() {
return replicaSetsContext.poll();
}
@@ -59,20 +61,18 @@ public class MongoSourceTask extends SourceTask {
replicaSetManager = ReplicaSetManager.create(sourceTaskConfig.getMongoAddr());
replicaSetManager.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
- ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
- replicaSetName.getBytes()));
- if (byteBuffer != null && byteBuffer.array().length > 0) {
- String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
- Position position = JSONObject.parseObject(positionJson, Position.class);
+ final RecordOffset recordOffset = this.sourceTaskContext.offsetStorageReader().readOffset(this.buildRecordPartition(replicaSetName));
+ if (recordOffset != null && recordOffset.getOffset().size() > 0) {
+ final Map<String, Object> offset = (Map<String, Object>) recordOffset.getOffset();
+ Position position = new Position();
+ position.setTimeStamp((int) offset.get(Constants.TIMESTAMP));
replicaSetConfig.setPosition(position);
} else {
Position position = new Position();
position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp());
- position.setInc(sourceTaskConfig.getPositionInc());
- position.setInitSync(sourceTaskConfig.isDataSync());
replicaSetConfig.setPosition(position);
}
-
+ replicaSetConfig.setMaxTask(config.getInt(Constants.MAX_TASK));
ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
replicaSetsContext.addReplicaSet(replicaSet);
replicaSet.start();
@@ -84,22 +84,17 @@ public class MongoSourceTask extends SourceTask {
}
}
+ private RecordPartition buildRecordPartition(String replicaSetName) {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put(Constants.REPLICA_SET_NAME, replicaSetName);
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
+ }
+
@Override
public void stop() {
logger.info("shut down.....");
replicaSetsContext.shutdown();
}
- @Override
- public void pause() {
- logger.info("pause replica task...");
- replicaSetsContext.pause();
- }
-
- @Override
- public void resume() {
- logger.info("resume replica task...");
- replicaSetsContext.resume();
- }
-
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
index 1d6dfe53..730bec08 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -17,118 +17,115 @@
package org.apache.connect.mongo.connector.builder;
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
-import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.BsonTimestamp;
-
-import static org.apache.connect.mongo.replicator.Constants.CREATED;
-import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
-import static org.apache.connect.mongo.replicator.Constants.PATCH;
-import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
-import static org.apache.connect.mongo.replicator.Constants.VERSION;
+import org.bson.Document;
public class MongoDataEntry {
- private static String SCHEMA_CREATED_NAME = "mongo_created";
- private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
-
- public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
-
- DataEntryBuilder dataEntryBuilder;
-
- if (event.getOperationType().equals(OperationType.CREATED)) {
- Schema schema = createdSchema(replicaSetConfig.getReplicaSetName());
- dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
- .entryType(event.getEntryType());
-
- dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
- dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-
- } else {
- Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
- dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$", "-"))
- .entryType(event.getEntryType());
- dataEntryBuilder.putFiled(OPERATION_TYPE, event.getOperationType().name());
- dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
- dataEntryBuilder.putFiled(VERSION, event.getV());
- dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
- dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
- dataEntryBuilder.putFiled(OBJECT_ID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+ public static ConnectRecord createSourceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
+ final Position position = replicaSetConfig.getPosition();
+ final int oldTimestamp = position.getTimeStamp();
+ final BsonTimestamp timestamp = event.getTimestamp();
+ if (oldTimestamp != 0 && timestamp != null && timestamp.getTime() <= oldTimestamp) {
+ return null;
}
-
- String position = createPosition(event, replicaSetConfig);
- SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
- ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
- return sourceDataEntry;
+ Schema schema = SchemaBuilder.struct().name(Constants.MONGO).build();
+ final List<Field> fields = buildFields(event.getDocument());
+ schema.setFields(fields);
+ final ConnectRecord connectRecord = new ConnectRecord(buildRecordPartition(replicaSetConfig),
+ buildRecordOffset(event, replicaSetConfig),
+ System.currentTimeMillis(),
+ schema,
+ buildPayLoad(fields, event, schema));
+ connectRecord.setExtensions(buildExtendFiled(event));
+ return connectRecord;
}
- private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
- Position position = new Position();
- BsonTimestamp timestamp = event.getTimestamp();
- position.setInc(timestamp != null ? timestamp.getInc() : 0);
- position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
- position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ? true : false);
- return JSONObject.toJSONString(position);
+ private static RecordPartition buildRecordPartition(ReplicaSetConfig replicaSetConfig) {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put(Constants.REPLICA_SET_NAME, replicaSetConfig.getReplicaSetName());
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
+ }
+ private static RecordOffset buildRecordOffset(ReplicationEvent event, ReplicaSetConfig config) {
+ Map<String, Integer> offsetMap = new HashMap<>();
+ final Position position = config.getPosition();
+ offsetMap.put(Constants.TIMESTAMP, event.getTimestamp() != null ? event.getTimestamp().getTime() : position.getTimeStamp());
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
}
- private static Schema createdSchema(String dataSourceName) {
- Schema schema = new Schema();
- schema.setDataSource(dataSourceName);
- schema.setName(SCHEMA_CREATED_NAME);
- schema.setFields(new ArrayList<>());
- createdField(schema);
- return schema;
+ private static List<Field> buildFields(Document document) {
+ List<Field> fields = new ArrayList<>();
+ int i = 0;
+ for (Map.Entry<String, Object> entry : document.entrySet()) {
+ final String key = entry.getKey();
+ final Object value = entry.getValue();
+ fields.add(new Field(i++, key, getSchema(value)));
+ }
+ return fields;
}
- private static Schema oplogSchema(String dataSourceName) {
- Schema schema = new Schema();
- schema.setDataSource(dataSourceName);
- schema.setName(SCHEMA_OPLOG_NAME);
- oplogField(schema);
- return schema;
+ private static Struct buildPayLoad(List<Field> fields, ReplicationEvent event, Schema schema) {
+ Struct payLoad = new Struct(schema);
+ final Document document = event.getDocument();
+ for (Field field : fields) {
+ final Schema valueSchema = field.getSchema();
+ if (valueSchema.getFieldType().equals(FieldType.STRING)) {
+ payLoad.put(field, JSON.toJSONString(document.get(field.getName())));
+ } else {
+ payLoad.put(field, document.get(field.getName()));
+ }
+ }
+ return payLoad;
}
- private static void createdField(Schema schema) {
- Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
- schema.getFields().add(namespace);
- Field operation = new Field(1, Constants.CREATED, FieldType.STRING);
- schema.getFields().add(operation);
+ private static KeyValue buildExtendFiled(ReplicationEvent event) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(Constants.OPERATION_TYPE, event.getOperationType().getOperation());
+ keyValue.put(Constants.COLLECTION_NAME, event.getCollectionName());
+ keyValue.put(Constants.NAMESPACE, event.getNamespace());
+ keyValue.put(Constants.REPLICA_SET_NAME, event.getReplicaSetName());
+ return keyValue;
}
- private static void oplogField(Schema schema) {
- schema.setFields(new ArrayList<>());
- Field op = new Field(0, OPERATION_TYPE, FieldType.STRING);
- schema.getFields().add(op);
- Field time = new Field(1, TIMESTAMP, FieldType.INT64);
- schema.getFields().add(time);
- Field v = new Field(2, VERSION, FieldType.INT32);
- schema.getFields().add(v);
- Field namespace = new Field(3, NAMESPACE, FieldType.STRING);
- schema.getFields().add(namespace);
- Field patch = new Field(4, PATCH, FieldType.STRING);
- schema.getFields().add(patch);
- Field objectId = new Field(5, OBJECT_ID, FieldType.STRING);
- schema.getFields().add(objectId);
+ private static Schema getSchema(Object value) {
+ Schema schema = null;
+ if (value instanceof Date) {
+ schema = SchemaBuilder.time().build();
+ } else if (value instanceof Document) {
+ schema = SchemaBuilder.string().build();
+ } else if (value instanceof Long) {
+ schema = SchemaBuilder.int64().build();
+ } else if (value instanceof Integer) {
+ schema = SchemaBuilder.int32().build();
+ } else if (value instanceof Boolean) {
+ schema = SchemaBuilder.bool().build();
+ } else {
+ schema = SchemaBuilder.string().build();
+ }
+ return schema;
}
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index a51b727f..ae704c34 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -67,8 +67,6 @@ public class InitSync {
countDownLatch.await();
} catch (InterruptedException e) {
logger.error("init sync wait countDownLatch interrupted");
- } finally {
- copyExecutor.shutdown();
}
}
@@ -76,7 +74,6 @@ public class InitSync {
interestCollections = getInterestCollection();
copyThreadCount = Math.min(interestCollections.size(), context.getCopyThread());
copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() {
-
AtomicInteger threads = new AtomicInteger();
@Override
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index 7ba1ac49..b45eeec5 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -31,6 +31,25 @@ public class Constants {
public static final String OBJECT_ID = "o2";
public static final String CREATED = "created";
+
public static final String PATCH = "patch";
+ public static final String DOCUMENT = "document";
+
+ public static final String COLLECTION_NAME = "collectionName";
+
+ public static final String REPLICA_SET_NAME = "replicaSetName";
+
+ public static final String INCREMENT = "increment";
+
+ public static final String INIT_SYNC = "initSync";
+
+ public static String SCHEMA_CREATED_NAME = "mongo_created";
+
+ public static String SCHEMA_OPLOG_NAME = "mongo_oplog";
+
+ public static String MONGO = "MONGO";
+
+ public static final String MAX_TASK = "max.tasks";
+
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
index 11bca8ff..41a3364f 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
@@ -109,6 +109,18 @@ public class MongoClientFactory {
sb.append(taskConfig.getZlibCompressionLevel());
}
+ if (taskConfig.getMaxConnectionIdleTime() > 0) {
+ sb.append("&");
+ sb.append("maxConnectionIdleTime=");
+ sb.append(taskConfig.getMaxConnectionIdleTime());
+ }
+
+ if (taskConfig.getSocketKeepAlive()) {
+ sb.append("&");
+ sb.append("socketKeepAlive=");
+ sb.append(true);
+ }
+
if (StringUtils.isNotBlank(taskConfig.getTrustStore())) {
Properties properties = System.getProperties();
properties.put("javax.net.ssl.trustStore", taskConfig.getTrustStore());
@@ -123,7 +135,8 @@ public class MongoClientFactory {
logger.info("connection string :{}", sb.toString());
ConnectionString connectionString = new ConnectionString(sb.toString());
- return MongoClients.create(connectionString);
+ final MongoClient client = MongoClients.create(connectionString);
+ return client;
}
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java
index 29fd8565..830a1e37 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/Position.java
@@ -64,7 +64,7 @@ public class Position {
return timeStamp > 0 && inc > 0;
}
- public BsonTimestamp converBsonTimeStamp() {
+ public BsonTimestamp convertBsonTimeStamp() {
return new BsonTimestamp(timeStamp, inc);
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
index 83933165..123c2680 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -63,7 +63,14 @@ public class ReplicaSet {
try {
this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig);
this.checkReplicaMongo();
- executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+ final Integer maxTask = replicaSetConfig.getMaxTask();
+ if (maxTask != null && maxTask > 1) {
+ for (int i = 0; i < maxTask; i++) {
+ executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+ }
+ } else {
+ executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
+ }
} catch (Exception e) {
logger.error("start replicator:{} error", replicaSetConfig, e);
shutdown();
@@ -83,15 +90,7 @@ public class ReplicaSet {
}
public void shutdown() {
- if (running.compareAndSet(true, false)) {
- if (!this.executorService.isShutdown()) {
- executorService.shutdown();
- }
- if (this.mongoClient != null) {
- this.mongoClient.close();
- }
- }
-
+ running.compareAndSet(true, false);
}
public void pause() {
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
index ced90b8b..2892f273 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -24,6 +24,16 @@ public class ReplicaSetConfig {
private String host;
private Position position;
+ private Integer maxTask;
+
+ public Integer getMaxTask() {
+ return maxTask;
+ }
+
+ public void setMaxTask(Integer maxTask) {
+ this.maxTask = maxTask;
+ }
+
public Position getPosition() {
return position;
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index 8dd85d75..51febda3 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -18,12 +18,9 @@
package org.apache.connect.mongo.replicator;
import com.mongodb.client.MongoClient;
-import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.data.ConnectRecord;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,10 +28,14 @@ import org.apache.connect.mongo.SourceTaskConfig;
import org.apache.connect.mongo.connector.builder.MongoDataEntry;
import org.apache.connect.mongo.initsync.CollectionMeta;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ReplicaSetsContext {
- private BlockingQueue<SourceDataEntry> dataEntryQueue;
+ private Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private BlockingQueue<ConnectRecord> connectRecordQueue;
private SourceTaskConfig taskConfig;
@@ -50,7 +51,7 @@ public class ReplicaSetsContext {
public ReplicaSetsContext(SourceTaskConfig taskConfig) {
this.taskConfig = taskConfig;
this.replicaSets = new ArrayList<>();
- this.dataEntryQueue = new LinkedBlockingDeque<>();
+ this.connectRecordQueue = new LinkedBlockingDeque<>();
this.operationFilter = new OperationFilter(taskConfig);
this.mongoClientFactory = new MongoClientFactory(taskConfig);
}
@@ -88,20 +89,25 @@ public class ReplicaSetsContext {
}
public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
- SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig);
+
+ ConnectRecord connectRecord = MongoDataEntry.createSourceDataEntry(event, replicaSetConfig);
+ if (connectRecord == null) {
+ return;
+ }
while (true) {
try {
- dataEntryQueue.put(sourceDataEntry);
+ connectRecordQueue.put(connectRecord);
break;
- } catch (InterruptedException e) {
+ } catch (Exception e) {
+ logger.error("convert error", e);
}
}
}
- public Collection<SourceDataEntry> poll() {
- List<SourceDataEntry> res = new ArrayList<>();
- if (dataEntryQueue.drainTo(res, 20) == 0) {
+ public List<ConnectRecord> poll() {
+ List<ConnectRecord> res = new ArrayList<>();
+ if (connectRecordQueue.drainTo(res, 20) == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index cd78f241..048dd6d9 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -61,7 +61,7 @@ public class ReplicatorTask implements Runnable {
boolean needDataSync = !userConfigOrRuntimePosition.isValid()
|| userConfigOrRuntimePosition.isInitSync()
// userConfigOrRuntimePosition.position < firstAvailablePosition maybe lost some operations
- || userConfigOrRuntimePosition.converBsonTimeStamp().compareTo(firstAvailablePosition) < 0;
+ || userConfigOrRuntimePosition.convertBsonTimeStamp().compareTo(firstAvailablePosition) < 0;
if (needDataSync) {
recordLastOplogPosition();
@@ -76,7 +76,7 @@ public class ReplicatorTask implements Runnable {
MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
- Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
+ Filters.gt("ts", replicaSetConfig.getPosition().convertBsonTimeStamp()));
MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
.noCursorTimeout(true)
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
index b4186667..62152b94 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
@@ -31,6 +31,10 @@ public enum OperationType {
private final String operation;
+ public String getOperation() {
+ return operation;
+ }
+
OperationType(String operation) {
this.operation = operation;
}
diff --git a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 7adca716..ac9a666a 100644
--- a/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -17,7 +17,6 @@
package org.apache.connect.mongo.replicator.event;
-import io.openmessaging.connector.api.data.EntryType;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonTimestamp;
@@ -92,19 +91,6 @@ public class ReplicationEvent {
return objectId;
}
- public EntryType getEntryType() {
- switch (operationType) {
- case UPDATE:
- return EntryType.UPDATE;
- case DELETE:
- return EntryType.DELETE;
- case INSERT:
- return EntryType.CREATE;
- default:
- return EntryType.CREATE;
- }
- }
-
public void setOperationType(OperationType operationType) {
this.operationType = operationType;
}
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
index 1e077817..73b9c955 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -46,7 +46,7 @@ public class MongoFactoryTest {
@Before
public void before() {
- this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27027");
+ this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017");
this.sourceTaskConfig = new SourceTaskConfig();
this.mongoClientFactory = new MongoClientFactory(sourceTaskConfig);
}
@@ -170,25 +170,4 @@ public class MongoFactoryTest {
return null;
}
- @Test
- public void testSSLTrustStore() {
- sourceTaskConfig.setMongoUserName("user_test");
- sourceTaskConfig.setMongoPassWord("pwd_test");
- sourceTaskConfig.setSsl(true);
- sourceTaskConfig.setSslInvalidHostNameAllowed(true);
- sourceTaskConfig.setTrustStore("/Users/home/test.pem");
- sourceTaskConfig.setTrustStorePassword("test001");
- sourceTaskConfig.setServerSelectionTimeoutMS(10000);
- MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig);
- MongoCollection<Document> collection = client.getDatabase("test").getCollection("person");
- Document document = new Document();
- document.put("name", "test");
- collection.insertOne(document);
- MongoCursor<Document> iterator = collection.find().iterator();
- while (iterator.hasNext()) {
- System.out.println(iterator.next());
- }
-
- }
-
}
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index 7206408d..75491645 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -17,18 +17,18 @@
package org.apache.connect.mongo;
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.EntryType;
-import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPosition;
+import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.internal.DefaultKeyValue;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.connect.mongo.connector.MongoSourceConnector;
import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
@@ -59,17 +59,12 @@ public class MongoSourceConnectorTest {
Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class);
}
- @Test
- public void verifyConfig() {
- String s = mongoSourceConnector.verifyAndSetConfig(keyValue);
- Assert.assertTrue(s.contains("Request config key:"));
- }
@Test
public void testPoll() throws Exception {
- LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>();
+ LinkedBlockingQueue<ConnectRecord> entries = new LinkedBlockingQueue<>();
ReplicaSetsContext context = new ReplicaSetsContext(sourceTaskConfig);
- Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("dataEntryQueue");
+ Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("connectRecordQueue");
dataEntryQueue.setAccessible(true);
dataEntryQueue.set(context, entries);
ReplicationEvent event = new ReplicationEvent();
@@ -80,32 +75,29 @@ public class MongoSourceConnectorTest {
event.setH(324243242L);
event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue")));
event.setObjectId(Optional.empty());
- context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27027"));
- List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll();
- Assert.assertTrue(sourceDataEntries.size() == 1);
-
- SourceDataEntry sourceDataEntry = sourceDataEntries.get(0);
- Assert.assertEquals("test-person", sourceDataEntry.getQueueName());
+ event.setReplicaSetName("testReplicaName");
+ event.setCollectionName("testCollectName");
+ final ReplicaSetConfig name = new ReplicaSetConfig("", "testReplicaName", "localhost:27027");
+ Position position = new Position();
+ position.setTimeStamp(0);
+ name.setPosition(position);
+ context.publishEvent(event, name);
+ List<ConnectRecord> connectRecords = context.poll();
+ Assert.assertTrue(connectRecords.size() == 1);
- ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
- Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
+ ConnectRecord connectRecord = connectRecords.get(0);
+ final Struct data = (Struct) connectRecord.getData();
- ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
- Position position = JSONObject.parseObject(new String(sourcePosition.array()), Position.class);
- Assert.assertEquals(position.getTimeStamp(), 1565609506);
- Assert.assertEquals(position.getInc(), 1);
- Assert.assertEquals(position.isInitSync(), false);
+ Assert.assertEquals("test.person", connectRecord.getExtension(Constants.NAMESPACE));
+ Assert.assertEquals("testReplicaName", connectRecord.getExtension(Constants.REPLICA_SET_NAME));
- EntryType entryType = sourceDataEntry.getEntryType();
- Assert.assertEquals(EntryType.CREATE, entryType);
+ final RecordPosition recordPosition = connectRecord.getPosition();
+ final RecordOffset offsetMap = recordPosition.getOffset();
+ Assert.assertEquals(1565609506, offsetMap.getOffset().get(Constants.TIMESTAMP));
- String queueName = sourceDataEntry.getQueueName();
- Assert.assertEquals("test-person", queueName);
+ final List<io.openmessaging.connector.api.data.Field> fields = connectRecord.getSchema().getFields();
+ Assert.assertEquals(1, fields.size());
- Schema schema = sourceDataEntry.getSchema();
- Assert.assertTrue(schema.getFields().size() == 6);
- Object[] payload = sourceDataEntry.getPayload();
- Assert.assertTrue(payload.length == 6);
}
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
index 1e0c82ab..48b5f759 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
@@ -17,18 +17,13 @@
package org.apache.connect.mongo;
-import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.PositionStorageReader;
-import io.openmessaging.connector.api.source.SourceTask;
-import io.openmessaging.connector.api.source.SourceTaskContext;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
import io.openmessaging.internal.DefaultKeyValue;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.connector.MongoSourceTask;
import org.apache.connect.mongo.replicator.ReplicaSet;
@@ -49,7 +44,7 @@ public class MongoSourceTaskTest {
defaultKeyValue.put("serverSelectionTimeoutMS", "10");
defaultKeyValue.put("dataSync", "true");
- Field context = SourceTask.class.getDeclaredField("context");
+ Field context = SourceTask.class.getDeclaredField("sourceTaskContext");
context.setAccessible(true);
context.set(mongoSourceTask, emptyTaskContext());
mongoSourceTask.start(defaultKeyValue);
@@ -69,30 +64,26 @@ public class MongoSourceTaskTest {
Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 11111111);
- Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 111);
- Assert.assertTrue(replicaSetConfig1.getPosition().isInitSync());
}
private SourceTaskContext emptyTaskContext() {
return new SourceTaskContext() {
- @Override
- public PositionStorageReader positionStorageReader() {
- return new PositionStorageReader() {
- @Override
- public ByteBuffer getPosition(ByteBuffer partition) {
- return null;
- }
-
- @Override
- public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
- return null;
- }
- };
+
+ @Override public OffsetStorageReader offsetStorageReader() {
+ return new TestPositionStorageReader();
+ }
+
+ @Override public String getConnectorName() {
+ return "mongoSourceConnector";
+ }
+
+ @Override public String getTaskName() {
+ return "mongoSourceTask";
}
@Override
public KeyValue configs() {
- return null;
+ return new DefaultKeyValue();
}
};
}
@@ -101,12 +92,12 @@ public class MongoSourceTaskTest {
public void testContextStart() throws NoSuchFieldException, IllegalAccessException {
MongoSourceTask mongoSourceTask = new MongoSourceTask();
DefaultKeyValue defaultKeyValue = new DefaultKeyValue();
- defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027");
+ defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27017");
defaultKeyValue.put("serverSelectionTimeoutMS", "10");
- Field context = SourceTask.class.getDeclaredField("context");
+ Field context = SourceTask.class.getDeclaredField("sourceTaskContext");
context.setAccessible(true);
- context.set(mongoSourceTask, TaskContext());
+ context.set(mongoSourceTask, taskContext());
mongoSourceTask.start(defaultKeyValue);
Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext");
@@ -122,37 +113,28 @@ public class MongoSourceTaskTest {
replicaSetConfig.setAccessible(true);
ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet);
Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test"));
- Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027"));
- Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 22222222);
- Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 222);
- Assert.assertTrue(!replicaSetConfig1.getPosition().isInitSync());
+ Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27017"));
+ Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 0);
}
- private SourceTaskContext TaskContext() {
+ private SourceTaskContext taskContext() {
return new SourceTaskContext() {
- @Override
- public PositionStorageReader positionStorageReader() {
- return new PositionStorageReader() {
- @Override
- public ByteBuffer getPosition(ByteBuffer partition) {
-
- Map<String, Object> po = new HashMap<>();
- po.put("timeStamp", 22222222);
- po.put("inc", 222);
- po.put("initSync", false);
- return ByteBuffer.wrap(JSONObject.toJSONString(po).getBytes());
- }
-
- @Override
- public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) {
- return null;
- }
- };
+
+ @Override public OffsetStorageReader offsetStorageReader() {
+ return new TestPositionStorageReader();
+ }
+
+ @Override public String getConnectorName() {
+ return "mongoSourceConnector";
+ }
+
+ @Override public String getTaskName() {
+ return "mongoSourceTask";
}
@Override
public KeyValue configs() {
- return null;
+ return new DefaultKeyValue();
}
};
}
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java
index 7a67c752..3f8a8175 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -23,11 +23,12 @@ import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
-import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.data.Struct;
import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -56,7 +57,7 @@ public class MongoTest {
@Before
public void before() {
MongoClientSettings.Builder builder = MongoClientSettings.builder();
- builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27027"));
+ builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27017"));
mongoClient = MongoClients.create(builder.build());
}
@@ -76,17 +77,16 @@ public class MongoTest {
Assert.assertEquals("test.person", event.getNamespace());
Assert.assertTrue(11111L == event.getH());
Assert.assertEquals(OperationType.INSERT, event.getOperationType());
- Assert.assertEquals(EntryType.CREATE, event.getEntryType());
Assert.assertEquals(document, event.getEventData().get());
Assert.assertEquals("testR", event.getReplicaSetName());
}
@Test
- public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+ public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException {
MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person");
collection.deleteMany(new Document());
- int count = 1000;
+ int count = 10;
List<String> documents = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Document document = new Document();
@@ -103,6 +103,8 @@ public class MongoTest {
insterest.put("test", collections);
sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest));
ReplicaSetConfig replicaSetConfig = new ReplicaSetConfig("", "test", "localhost");
+ Position position = new Position();
+ replicaSetConfig.setPosition(position);
ReplicaSetsContext replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
Field running = ReplicaSet.class.getDeclaredField("running");
@@ -112,27 +114,16 @@ public class MongoTest {
initSync.start();
int syncCount = 0;
while (syncCount < count) {
- Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll();
- Assert.assertTrue(sourceDataEntries.size() > 0);
- for (SourceDataEntry sourceDataEntry : sourceDataEntries) {
- ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
- Assert.assertEquals("test", new String(sourcePartition.array()));
- ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
- Position position = new Position();
- position.setInitSync(true);
- position.setTimeStamp(0);
- position.setInc(0);
- Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), Position.class));
- EntryType entryType = sourceDataEntry.getEntryType();
- Assert.assertEquals(EntryType.CREATE, entryType);
- String queueName = sourceDataEntry.getQueueName();
- Assert.assertEquals("test-person", queueName);
- Schema schema = sourceDataEntry.getSchema();
- Assert.assertTrue(schema.getFields().size() == 2);
- Object[] payload = sourceDataEntry.getPayload();
- Assert.assertTrue(payload.length == 2);
- Assert.assertEquals(payload[0].toString(), "test.person");
- Assert.assertTrue(documents.contains(JSONObject.parseObject(payload[1].toString(), Document.class).get("_id", JSONObject.class).getString("$oid")));
+ Collection<ConnectRecord> connectRecords = replicaSetsContext.poll();
+ Assert.assertTrue(connectRecords.size() > 0);
+ for (ConnectRecord connectRecord : connectRecords) {
+ final Struct data = (Struct) connectRecord.getData();
+ final Object[] values = data.getValues();
+ Schema schema = connectRecord.getSchema();
+ Assert.assertTrue(schema.getFields().size() == 4);
+ Assert.assertTrue(values.length == 4);
+ final Map<String, ?> partition = connectRecord.getPosition().getPartition().getPartition();
+ Assert.assertEquals("test", partition.get(Constants.REPLICA_SET_NAME));
syncCount++;
}
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
index f19768eb..a723106e 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java
@@ -39,7 +39,7 @@ public class ReplicaContextTest {
@Test
public void testCreateMongoClient() {
- MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027"));
+ MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27017"));
MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames();
MongoCursor<String> iterator = collectionNames.iterator();
while (iterator.hasNext()) {
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
index c19098bf..d3edef67 100644
--- a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java
@@ -39,7 +39,7 @@ public class ReplicaSetTest {
@Before
public void before() {
this.sourceTaskConfig = new SourceTaskConfig();
- this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027");
+ this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27017");
this.replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
this.replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext);
}
diff --git a/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/TestPositionStorageReader.java b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/TestPositionStorageReader.java
new file mode 100644
index 00000000..509c1a46
--- /dev/null
+++ b/connectors/rocketmq-connect-mongo/src/test/java/org/apache/connect/mongo/TestPositionStorageReader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.mongo;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import java.util.Collection;
+import java.util.Map;
+
+public class TestPositionStorageReader implements OffsetStorageReader {
+
+ @Override public RecordOffset readOffset(RecordPartition partition) {
+ return null;
+ }
+
+ @Override public Map<RecordPartition, RecordOffset> readOffsets(Collection<RecordPartition> partitions) {
+ return null;
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
index 69bcacf6..366eb3f2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/WorkerConfig.java
@@ -48,7 +48,7 @@ public class WorkerConfig {
* config example:
* namesrvAddr = localhost:9876
*/
- private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+ private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
/**
* Http port for REST API.