You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:22:33 UTC
[rocketmq-connect] 10/13: fix some bug and add readme
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 2cdb27a2b2e418a1228c0d83adf6acff64e79554
Author: 李平 <17...@qq.com>
AuthorDate: Thu Aug 22 15:10:57 2019 +0800
fix some bug and add readme
---
README.md | 22 +++
.../org/apache/connect/mongo/SourceTaskConfig.java | 188 +++++++++++----------
.../mongo/connector/MongoSourceConnector.java | 17 ++
.../connect/mongo/connector/MongoSourceTask.java | 49 +++---
.../mongo/connector/builder/MongoDataEntry.java | 32 +++-
.../connect/mongo/initsync/CollectionMeta.java | 17 ++
.../apache/connect/mongo/initsync/InitSync.java | 27 ++-
.../apache/connect/mongo/replicator/Constants.java | 23 ++-
.../apache/connect/mongo/replicator/Filter.java | 19 ++-
.../mongo/replicator/MongoClientFactory.java | 38 +++--
.../apache/connect/mongo/replicator/Position.java | 85 ++++++++++
.../connect/mongo/replicator/ReplicaSet.java | 32 +++-
.../connect/mongo/replicator/ReplicaSetConfig.java | 93 ++--------
.../{ReplicaSets.java => ReplicaSetManager.java} | 29 +++-
.../mongo/replicator/ReplicaSetsContext.java | 29 +++-
.../connect/mongo/replicator/ReplicatorTask.java | 62 +++++--
...Converter.java => Document2EventConverter.java} | 27 ++-
.../mongo/replicator/event/OperationType.java | 54 ++++--
.../mongo/replicator/event/ReplicationEvent.java | 17 ++
.../java/org/apache/connect/mongo/FilterTest.java | 3 +-
.../org/apache/connect/mongo/MongoFactoryTest.java | 22 +--
.../connect/mongo/MongoSourceConnectorTest.java | 3 +-
.../apache/connect/mongo/MongoSourceTaskTest.java | 3 +-
.../java/org/apache/connect/mongo/MongoTest.java | 25 ++-
.../apache/connect/mongo/OperationTypeTest.java | 37 ++++
...icaSetsTest.java => ReplicaSetManagerTest.java} | 20 +--
26 files changed, 687 insertions(+), 286 deletions(-)
diff --git a/README.md b/README.md
index fb2c6c6..e32c455 100644
--- a/README.md
+++ b/README.md
@@ -10,3 +10,25 @@ and then init a mongo replicaSet
`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test
+
+
+## task config params
+
+| param | Description | type |
+| --- | --- | --- |
+| mongoAddr | shardName=replicaSetName/127.0.0.1:2781,127.0.0.1:2782,127.0.0.1:2783; | string, split by ; |
+| mongoUserName | mongo root username| string |
+| mongoPassWord | mongo root password| string |
+| interestDbAndCollection | {"dbName":["collection1","collection2"]}, collectionName can be "*" means all collection | json |
+| positionTimeStamp | mongo oplog `bsontimestamp.value`, runtime store position is highest level | int |
+| positionInc | mongo oplog `bsontimestamp.inc`, runtime store position is highest level | int |
+| dataSync | sync all interestDbAndCollection data, runtime store position is highest level | json, Map<String(dbName), List<String(collectionName)>> |
+| serverSelectionTimeoutMS | mongo driver select replicaServer timeout | long |
+| connectTimeoutMS | mongo driver connect socket timeout | long |
+| socketTimeoutMS | mongo driver read or write timeout | long |
+| ssl or tsl | mongo driver use ssl or tsl | boolean |
+| tlsInsecure or sslInvalidHostNameAllowed | mongo driver when use ssl or tsl allow invalid hostname | boolean|
+| compressors | compressors way | string (zlib or snappy)
+| zlibCompressionLevel | zlib compressors level| int (1-7)|
+| trustStore | ssl pem| path|
+| trustStorePassword | ssl pem decrypt password | string|
diff --git a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
index 3df9dc4..d184b5c 100644
--- a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
+++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo;
import io.openmessaging.KeyValue;
@@ -9,22 +26,21 @@ import org.bson.BsonTimestamp;
public class SourceTaskConfig {
- private String replicaSet;
private String mongoAddr;
private String mongoUserName;
private String mongoPassWord;
private String interestDbAndCollection;
- private String positionTimeStamp;
- private String positionInc;
- private String dataSync;
- private String serverSelectionTimeoutMS;
- private String connectTimeoutMS;
- private String socketTimeoutMS;
- private String ssl;
- private String tsl;
- private String tlsInsecure;
- private String sslInvalidHostNameAllowed;
- private String tlsAllowInvalidHostnames;
+ private int positionTimeStamp;
+ private int positionInc;
+ private boolean dataSync;
+ private long serverSelectionTimeoutMS;
+ private long connectTimeoutMS;
+ private long socketTimeoutMS;
+ private boolean ssl;
+ private boolean tsl;
+ private boolean tlsInsecure;
+ private boolean sslInvalidHostNameAllowed;
+ private boolean tlsAllowInvalidHostnames;
private String compressors;
private String zlibCompressionLevel;
private String trustStore;
@@ -37,52 +53,28 @@ public class SourceTaskConfig {
}
});
- public String getTrustStore() {
- return trustStore;
- }
-
- public void setTrustStore(String trustStore) {
- this.trustStore = trustStore;
- }
-
- public String getTrustStorePassword() {
- return trustStorePassword;
- }
-
- public void setTrustStorePassword(String trustStorePassword) {
- this.trustStorePassword = trustStorePassword;
- }
-
- public String getZlibCompressionLevel() {
- return zlibCompressionLevel;
- }
-
- public void setZlibCompressionLevel(String zlibCompressionLevel) {
- this.zlibCompressionLevel = zlibCompressionLevel;
- }
-
- public String getPositionInc() {
- return positionInc;
+ public String getMongoAddr() {
+ return mongoAddr;
}
- public void setPositionInc(String positionInc) {
- this.positionInc = positionInc;
+ public void setMongoAddr(String mongoAddr) {
+ this.mongoAddr = mongoAddr;
}
- public int getCopyThread() {
- return copyThread;
+ public String getMongoUserName() {
+ return mongoUserName;
}
- public void setCopyThread(int copyThread) {
- this.copyThread = copyThread;
+ public void setMongoUserName(String mongoUserName) {
+ this.mongoUserName = mongoUserName;
}
- public String getPositionTimeStamp() {
- return positionTimeStamp;
+ public String getMongoPassWord() {
+ return mongoPassWord;
}
- public void setPositionTimeStamp(String positionTimeStamp) {
- this.positionTimeStamp = positionTimeStamp;
+ public void setMongoPassWord(String mongoPassWord) {
+ this.mongoPassWord = mongoPassWord;
}
public String getInterestDbAndCollection() {
@@ -93,107 +85,91 @@ public class SourceTaskConfig {
this.interestDbAndCollection = interestDbAndCollection;
}
- public String getMongoAddr() {
- return mongoAddr;
- }
-
- public void setMongoAddr(String mongoAddr) {
- this.mongoAddr = mongoAddr;
- }
-
- public String getMongoUserName() {
- return mongoUserName;
+ public int getPositionTimeStamp() {
+ return positionTimeStamp;
}
- public void setMongoUserName(String mongoUserName) {
- this.mongoUserName = mongoUserName;
+ public void setPositionTimeStamp(int positionTimeStamp) {
+ this.positionTimeStamp = positionTimeStamp;
}
- public String getMongoPassWord() {
- return mongoPassWord;
+ public int getPositionInc() {
+ return positionInc;
}
- public void setMongoPassWord(String mongoPassWord) {
- this.mongoPassWord = mongoPassWord;
+ public void setPositionInc(int positionInc) {
+ this.positionInc = positionInc;
}
- public String getDataSync() {
+ public boolean isDataSync() {
return dataSync;
}
- public void setDataSync(String dataSync) {
+ public void setDataSync(boolean dataSync) {
this.dataSync = dataSync;
}
- public String getReplicaSet() {
- return replicaSet;
- }
-
- public String getServerSelectionTimeoutMS() {
+ public long getServerSelectionTimeoutMS() {
return serverSelectionTimeoutMS;
}
- public void setServerSelectionTimeoutMS(String serverSelectionTimeoutMS) {
+ public void setServerSelectionTimeoutMS(long serverSelectionTimeoutMS) {
this.serverSelectionTimeoutMS = serverSelectionTimeoutMS;
}
- public void setReplicaSet(String replicaSet) {
- this.replicaSet = replicaSet;
- }
-
- public String getConnectTimeoutMS() {
+ public long getConnectTimeoutMS() {
return connectTimeoutMS;
}
- public void setConnectTimeoutMS(String connectTimeoutMS) {
+ public void setConnectTimeoutMS(long connectTimeoutMS) {
this.connectTimeoutMS = connectTimeoutMS;
}
- public String getSocketTimeoutMS() {
+ public long getSocketTimeoutMS() {
return socketTimeoutMS;
}
- public void setSocketTimeoutMS(String socketTimeoutMS) {
+ public void setSocketTimeoutMS(long socketTimeoutMS) {
this.socketTimeoutMS = socketTimeoutMS;
}
- public String getSsl() {
+ public boolean getSsl() {
return ssl;
}
- public void setSsl(String ssl) {
+ public void setSsl(boolean ssl) {
this.ssl = ssl;
}
- public String getTsl() {
+ public boolean getTsl() {
return tsl;
}
- public void setTsl(String tsl) {
+ public void setTsl(boolean tsl) {
this.tsl = tsl;
}
- public String getTlsInsecure() {
+ public boolean getTlsInsecure() {
return tlsInsecure;
}
- public void setTlsInsecure(String tlsInsecure) {
+ public void setTlsInsecure(boolean tlsInsecure) {
this.tlsInsecure = tlsInsecure;
}
- public String getSslInvalidHostNameAllowed() {
+ public boolean getSslInvalidHostNameAllowed() {
return sslInvalidHostNameAllowed;
}
- public void setSslInvalidHostNameAllowed(String sslInvalidHostNameAllowed) {
+ public void setSslInvalidHostNameAllowed(boolean sslInvalidHostNameAllowed) {
this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed;
}
- public String getTlsAllowInvalidHostnames() {
+ public boolean getTlsAllowInvalidHostnames() {
return tlsAllowInvalidHostnames;
}
- public void setTlsAllowInvalidHostnames(String tlsAllowInvalidHostnames) {
+ public void setTlsAllowInvalidHostnames(boolean tlsAllowInvalidHostnames) {
this.tlsAllowInvalidHostnames = tlsAllowInvalidHostnames;
}
@@ -205,6 +181,38 @@ public class SourceTaskConfig {
this.compressors = compressors;
}
+ public String getZlibCompressionLevel() {
+ return zlibCompressionLevel;
+ }
+
+ public void setZlibCompressionLevel(String zlibCompressionLevel) {
+ this.zlibCompressionLevel = zlibCompressionLevel;
+ }
+
+ public String getTrustStore() {
+ return trustStore;
+ }
+
+ public void setTrustStore(String trustStore) {
+ this.trustStore = trustStore;
+ }
+
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+
+ public void setTrustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ }
+
+ public int getCopyThread() {
+ return copyThread;
+ }
+
+ public void setCopyThread(int copyThread) {
+ this.copyThread = copyThread;
+ }
+
public void load(KeyValue props) {
properties2Object(props, this);
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
index e3dfb6f..5be2e0d 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.connector;
import io.openmessaging.KeyValue;
diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
index da244cd..49bcf49 100644
--- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
+++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.connector;
import com.alibaba.fastjson.JSONObject;
@@ -7,13 +24,10 @@ import io.openmessaging.connector.api.source.SourceTask;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
-import java.util.regex.Pattern;
-import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.SourceTaskConfig;
-import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSet;
-import org.apache.connect.mongo.replicator.ReplicaSetConfig;
-import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.apache.connect.mongo.replicator.ReplicaSetManager;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -24,12 +38,10 @@ public class MongoSourceTask extends SourceTask {
private SourceTaskConfig sourceTaskConfig;
- private ReplicaSets replicaSets;
+ private ReplicaSetManager replicaSetManager;
private ReplicaSetsContext replicaSetsContext;
- private Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
-
@Override
public Collection<SourceDataEntry> poll() {
@@ -41,24 +53,23 @@ public class MongoSourceTask extends SourceTask {
try {
sourceTaskConfig = new SourceTaskConfig();
sourceTaskConfig.load(config);
+
replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig);
- replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr());
- replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
+
+ replicaSetManager = ReplicaSetManager.create(sourceTaskConfig.getMongoAddr());
+
+ replicaSetManager.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> {
ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap(
replicaSetName.getBytes()));
if (byteBuffer != null && byteBuffer.array().length > 0) {
String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8);
- ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class);
+ Position position = JSONObject.parseObject(positionJson, Position.class);
replicaSetConfig.setPosition(position);
} else {
- ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
- position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null
- && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches()
- ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0);
- position.setInc(sourceTaskConfig.getPositionInc() != null
- && pattern.matcher(sourceTaskConfig.getPositionInc()).matches()
- ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0);
- position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false);
+ Position position = new Position();
+ position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp());
+ position.setInc(sourceTaskConfig.getPositionInc());
+ position.setInitSync(sourceTaskConfig.isDataSync());
replicaSetConfig.setPosition(position);
}
diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
index 87d92ea..1d6dfe5 100644
--- a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
+++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.connector.builder;
import com.alibaba.fastjson.JSONObject;
@@ -10,6 +27,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
@@ -17,8 +35,8 @@ import org.bson.BsonTimestamp;
import static org.apache.connect.mongo.replicator.Constants.CREATED;
import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
+import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
import static org.apache.connect.mongo.replicator.Constants.PATCH;
import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
import static org.apache.connect.mongo.replicator.Constants.VERSION;
@@ -48,12 +66,12 @@ public class MongoDataEntry {
dataEntryBuilder.timestamp(System.currentTimeMillis())
.queue(event.getNamespace().replace(".", "-").replace("$", "-"))
.entryType(event.getEntryType());
- dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name());
+ dataEntryBuilder.putFiled(OPERATION_TYPE, event.getOperationType().name());
dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue());
dataEntryBuilder.putFiled(VERSION, event.getV());
dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : "");
- dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
+ dataEntryBuilder.putFiled(OBJECT_ID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : "");
}
String position = createPosition(event, replicaSetConfig);
@@ -64,7 +82,7 @@ public class MongoDataEntry {
}
private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) {
- ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+ Position position = new Position();
BsonTimestamp timestamp = event.getTimestamp();
position.setInc(timestamp != null ? timestamp.getInc() : 0);
position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
@@ -99,7 +117,7 @@ public class MongoDataEntry {
private static void oplogField(Schema schema) {
schema.setFields(new ArrayList<>());
- Field op = new Field(0, OPERATIONTYPE, FieldType.STRING);
+ Field op = new Field(0, OPERATION_TYPE, FieldType.STRING);
schema.getFields().add(op);
Field time = new Field(1, TIMESTAMP, FieldType.INT64);
schema.getFields().add(time);
@@ -109,7 +127,7 @@ public class MongoDataEntry {
schema.getFields().add(namespace);
Field patch = new Field(4, PATCH, FieldType.STRING);
schema.getFields().add(patch);
- Field objectId = new Field(5, OBJECTID, FieldType.STRING);
+ Field objectId = new Field(5, OBJECT_ID, FieldType.STRING);
schema.getFields().add(objectId);
}
diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
index 9c73eac..4af5060 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.initsync;
public class CollectionMeta {
diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
index 6cdbe06..3d68fac 100644
--- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
+++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.initsync;
import com.mongodb.client.MongoClient;
@@ -13,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.connect.mongo.replicator.ReplicaSet;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
-import org.apache.connect.mongo.replicator.event.EventConverter;
+import org.apache.connect.mongo.replicator.event.Document2EventConverter;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.Document;
@@ -115,24 +132,24 @@ public class InitSync {
.batchSize(200)
.iterator();
while (replicaSet.isRuning() && mongoCursor.hasNext()) {
- if (context.initSyncAbort()) {
+ if (context.isInitSyncAbort()) {
logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
return;
}
count++;
Document document = mongoCursor.next();
- ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
+ ReplicationEvent event = Document2EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
event.setOperationType(OperationType.CREATED);
event.setNamespace(collectionMeta.getNameSpace());
context.publishEvent(event, replicaSetConfig);
}
} catch (Exception e) {
- context.initSyncError();
+ context.setInitSyncError();
+ replicaSet.shutdown();
logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace(), e);
} finally {
countDownLatch.countDown();
- replicaSet.shutdown();
}
logger.info("database:{}, collection:{}, copy {} documents, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count);
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
index c895bd6..7ba1ac4 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
public class Constants {
@@ -5,17 +22,15 @@ public class Constants {
public static final String MONGO_LOCAL_DATABASE = "local";
public static final String MONGO_OPLOG_RS = "oplog.rs";
- public static final String OPERATIONTYPE = "op";
+ public static final String OPERATION_TYPE = "op";
public static final String TIMESTAMP = "ts";
public static final String VERSION = "v";
public static final String HASH = "h";
public static final String NAMESPACE = "ns";
public static final String OPERATION = "o";
- public static final String OBJECTID = "o2";
+ public static final String OBJECT_ID = "o2";
public static final String CREATED = "created";
public static final String PATCH = "patch";
- public static final String INITSYNC = "initSync";
-
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
index fd26163..a517822 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
import com.alibaba.fastjson.JSONObject;
@@ -49,7 +66,7 @@ public class Filter {
return false;
};
- notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal();
+ notNoopFilter = (operationType) -> !operationType.equals(OperationType.NOOP);
}
public boolean filterMeta(CollectionMeta collectionMeta) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
index f5e01a3..11bca8f 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
import com.mongodb.ConnectionString;
@@ -38,46 +55,46 @@ public class MongoClientFactory {
sb.append(replicaSetConfig.getReplicaSetName());
}
- if (StringUtils.isNotBlank(taskConfig.getServerSelectionTimeoutMS())) {
+ if (taskConfig.getServerSelectionTimeoutMS() > 0) {
sb.append("&");
sb.append("serverSelectionTimeoutMS=");
sb.append(taskConfig.getServerSelectionTimeoutMS());
}
- if (StringUtils.isNotBlank(taskConfig.getConnectTimeoutMS())) {
+ if (taskConfig.getConnectTimeoutMS() > 0) {
sb.append("&");
sb.append("connectTimeoutMS=");
sb.append(taskConfig.getConnectTimeoutMS());
}
- if (StringUtils.isNotBlank(taskConfig.getSocketTimeoutMS())) {
+ if (taskConfig.getSocketTimeoutMS() > 0) {
sb.append("&");
sb.append("socketTimeoutMS=");
sb.append(taskConfig.getSocketTimeoutMS());
}
- if (StringUtils.isNotBlank(taskConfig.getSsl()) || StringUtils.isNotBlank(taskConfig.getTsl())) {
+ if (taskConfig.getSsl() || taskConfig.getTsl()) {
sb.append("&");
sb.append("ssl=");
sb.append(true);
}
- if (StringUtils.isNotBlank(taskConfig.getTlsInsecure())) {
+ if (taskConfig.getTlsInsecure()) {
sb.append("&");
sb.append("tlsInsecure=");
- sb.append(taskConfig.getTlsInsecure());
+ sb.append(true);
}
- if (StringUtils.isNotBlank(taskConfig.getTlsAllowInvalidHostnames())) {
+ if (taskConfig.getTlsAllowInvalidHostnames()) {
sb.append("&");
sb.append("tlsAllowInvalidHostnames=");
- sb.append(taskConfig.getTlsAllowInvalidHostnames());
+ sb.append(true);
}
- if (StringUtils.isNotBlank(taskConfig.getSslInvalidHostNameAllowed())) {
+ if (taskConfig.getSslInvalidHostNameAllowed()) {
sb.append("&");
sb.append("sslInvalidHostNameAllowed=");
- sb.append(taskConfig.getSslInvalidHostNameAllowed());
+ sb.append(true);
}
if (StringUtils.isNotBlank(taskConfig.getCompressors())) {
@@ -105,7 +122,6 @@ public class MongoClientFactory {
}
logger.info("connection string :{}", sb.toString());
- System.out.println(sb.toString());
ConnectionString connectionString = new ConnectionString(sb.toString());
return MongoClients.create(connectionString);
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/Position.java b/src/main/java/org/apache/connect/mongo/replicator/Position.java
new file mode 100644
index 0000000..29fd856
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/Position.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.mongo.replicator;
+
+import java.util.Objects;
+import org.bson.BsonTimestamp;
+
+public class Position {
+
+ private int timeStamp;
+ private int inc;
+ private boolean initSync;
+
+ public int getTimeStamp() {
+ return timeStamp;
+ }
+
+ public void setTimeStamp(int timeStamp) {
+ this.timeStamp = timeStamp;
+ }
+
+ public int getInc() {
+ return inc;
+ }
+
+ public void setInc(int inc) {
+ this.inc = inc;
+ }
+
+ public boolean isInitSync() {
+ return initSync;
+ }
+
+ public void setInitSync(boolean initSync) {
+ this.initSync = initSync;
+ }
+
+ public Position() {
+
+ }
+
+ public Position(int timeStamp, int inc, boolean initSync) {
+ this.timeStamp = timeStamp;
+ this.inc = inc;
+ this.initSync = initSync;
+ }
+
+ public boolean isValid() {
+ return timeStamp > 0 && inc > 0;
+ }
+
+ public BsonTimestamp converBsonTimeStamp() {
+ return new BsonTimestamp(timeStamp, inc);
+ }
+
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ Position position = (Position) o;
+ return timeStamp == position.timeStamp &&
+ inc == position.inc &&
+ initSync == position.initSync;
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(timeStamp, inc, initSync);
+ }
+}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
index 8f4d0d8..8393316 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
import com.mongodb.client.MongoClient;
@@ -38,14 +55,14 @@ public class ReplicaSet {
}
public void start() {
+ if (!running.compareAndSet(false, true)) {
+ logger.info("the java mongo replica already start");
+ return;
+ }
try {
- if (!running.compareAndSet(false, true)) {
- logger.info("the java mongo replica already start");
- return;
- }
this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig);
- this.isReplicaMongo();
+ this.checkReplicaMongo();
executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext));
} catch (Exception e) {
logger.error("start replicator:{} error", replicaSetConfig, e);
@@ -53,16 +70,15 @@ public class ReplicaSet {
}
}
- public boolean isReplicaMongo() {
+ public void checkReplicaMongo() {
MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE);
MongoIterable<String> collectionNames = local.listCollectionNames();
MongoCursor<String> iterator = collectionNames.iterator();
while (iterator.hasNext()) {
if (StringUtils.equals(MONGO_OPLOG_RS, iterator.next())) {
- return true;
+ return;
}
}
- this.shutdown();
throw new IllegalStateException(String.format("url:%s, is not replica", replicaSetConfig.getHost()));
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
index 1b54b17..ced90b8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java
@@ -1,7 +1,21 @@
-package org.apache.connect.mongo.replicator;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import java.util.Objects;
-import org.bson.BsonTimestamp;
+package org.apache.connect.mongo.replicator;
public class ReplicaSetConfig {
@@ -48,10 +62,6 @@ public class ReplicaSetConfig {
this.host = host;
}
- public Position emptyPosition() {
- return new Position(0, 0, true);
- }
-
@Override
public String toString() {
return "ReplicaSetConfig{" +
@@ -62,73 +72,4 @@ public class ReplicaSetConfig {
'}';
}
- public class Position {
- private int timeStamp;
- private int inc;
- private boolean initSync;
-
- public int getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(int timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public int getInc() {
- return inc;
- }
-
- public void setInc(int inc) {
- this.inc = inc;
- }
-
- public boolean isInitSync() {
- return initSync;
- }
-
- public void setInitSync(boolean initSync) {
- this.initSync = initSync;
- }
-
- public Position(int timeStamp, int inc, boolean initSync) {
- this.timeStamp = timeStamp;
- this.inc = inc;
- this.initSync = initSync;
- }
-
- public boolean isValid() {
- return timeStamp > 0;
- }
-
- public BsonTimestamp converBsonTimeStamp() {
- return new BsonTimestamp(timeStamp, inc);
- }
-
- @Override
- public String toString() {
- return "Position{" +
- "timeStamp=" + timeStamp +
- ", inc=" + inc +
- ", initSync=" + initSync +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
- Position position = (Position) o;
- return timeStamp == position.timeStamp &&
- inc == position.inc &&
- initSync == position.initSync;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timeStamp, inc, initSync);
- }
- }
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
similarity index 65%
rename from src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
rename to src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
index af1ebeb..c5757d8 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
import java.util.HashMap;
@@ -9,13 +26,15 @@ import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
-public class ReplicaSets {
+public class ReplicaSetManager {
private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)");
+ private static final String HOST_SEPARATOR = ";";
+
private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>();
- public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) {
+ public ReplicaSetManager(Set<ReplicaSetConfig> replicaSetConfigs) {
replicaSetConfigs.forEach(replicaSetConfig -> {
if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) {
replicaConfigByName.put(replicaSetConfig.getReplicaSetName(), replicaSetConfig);
@@ -25,10 +44,10 @@ public class ReplicaSets {
validate();
}
- public static ReplicaSets create(String hosts) {
+ public static ReplicaSetManager create(String hosts) {
Set<ReplicaSetConfig> replicaSetConfigs = new HashSet<>();
if (hosts != null) {
- for (String replicaSetStr : StringUtils.split(hosts.trim(), ";")) {
+ for (String replicaSetStr : StringUtils.split(hosts.trim(), HOST_SEPARATOR)) {
if (StringUtils.isNotBlank(replicaSetStr)) {
ReplicaSetConfig replicaSetConfig = parseReplicaSetStr(replicaSetStr);
if (replicaSetConfig != null) {
@@ -37,7 +56,7 @@ public class ReplicaSets {
}
}
}
- return new ReplicaSets(replicaSetConfigs);
+ return new ReplicaSetManager(replicaSetConfigs);
}
private static ReplicaSetConfig parseReplicaSetStr(String hosts) {
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
index e599f5b..b067256 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java
@@ -1,12 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
import com.mongodb.client.MongoClient;
import io.openmessaging.connector.api.data.SourceDataEntry;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.connect.mongo.SourceTaskConfig;
@@ -28,9 +46,12 @@ public class ReplicaSetsContext {
private MongoClientFactory mongoClientFactory;
+ private Map<String, Position> lastPositionMap;
+
public ReplicaSetsContext(SourceTaskConfig taskConfig) {
this.taskConfig = taskConfig;
- this.replicaSets = new CopyOnWriteArrayList<>();
+ this.replicaSets = new ArrayList<>();
+ this.lastPositionMap = new HashMap<>();
this.dataEntryQueue = new LinkedBlockingDeque<>();
this.filter = new Filter(taskConfig);
this.mongoClientFactory = new MongoClientFactory(taskConfig);
@@ -91,11 +112,11 @@ public class ReplicaSetsContext {
return res;
}
- public boolean initSyncAbort() {
+ public boolean isInitSyncAbort() {
return initSyncAbort.get();
}
- public void initSyncError() {
+ public void setInitSyncError() {
initSyncAbort.set(true);
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
index 6cb46d1..4c142ce 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator;
import com.mongodb.CursorType;
@@ -7,8 +24,9 @@ import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import org.apache.connect.mongo.initsync.InitSync;
-import org.apache.connect.mongo.replicator.event.EventConverter;
+import org.apache.connect.mongo.replicator.event.Document2EventConverter;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
+import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,19 +54,27 @@ public class ReplicatorTask implements Runnable {
@Override
public void run() {
- if (replicaSetConfig.getPosition() == null || replicaSetConfig.getPosition().isInitSync()) {
+ BsonTimestamp firstAvailablePosition = findOplogFirstPosition();
+
+ // inValid or
+ // user config dataSync or
+ // user config or runtime saved position lt first oplog position maybe some operation is lost so need dataSync
+ if (!replicaSetConfig.getPosition().isValid() || replicaSetConfig.getPosition().isInitSync()
+ || replicaSetConfig.getPosition().converBsonTimeStamp().compareTo(firstAvailablePosition) < 0) {
+ recordOplogLastPosition();
InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet);
initSync.start();
+
}
- MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
- FindIterable<Document> iterable;
- if (replicaSetConfig.getPosition().isValid()) {
- iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
- Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
- } else {
- iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+ if (!replicaSet.isRuning() || !replicaSetsContext.isInitSyncAbort()) {
+ return;
}
+
+ MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+ FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(
+ Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp()));
+
MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1))
.noCursorTimeout(true)
.cursorType(CursorType.TailableAwait)
@@ -70,10 +96,26 @@ public class ReplicatorTask implements Runnable {
logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig);
}
+ private BsonTimestamp findOplogFirstPosition() {
+ MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+ FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+ Document lastOplog = iterable.sort(new Document("$natural", 1)).limit(1).first();
+ BsonTimestamp timestamp = lastOplog.get(Constants.TIMESTAMP, BsonTimestamp.class);
+ return timestamp;
+ }
+
+ private void recordOplogLastPosition() {
+ MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE);
+ FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find();
+ Document lastOplog = iterable.sort(new Document("$natural", -1)).limit(1).first();
+ BsonTimestamp timestamp = lastOplog.get(Constants.TIMESTAMP, BsonTimestamp.class);
+ replicaSetConfig.setPosition(new Position(timestamp.getTime(), timestamp.getInc(), false));
+ }
+
private void executorCursor(MongoCursor<Document> cursor) {
while (cursor.hasNext() && !replicaSet.isPause()) {
Document document = cursor.next();
- ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
+ ReplicationEvent event = Document2EventConverter.convert(document, replicaSetConfig.getReplicaSetName());
if (replicaSetsContext.filterEvent(event)) {
replicaSetsContext.publishEvent(event, replicaSetConfig);
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java
similarity index 58%
rename from src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
rename to src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java
index 1b48990..99ab707 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator.event;
import java.util.Optional;
@@ -6,24 +23,24 @@ import org.bson.Document;
import static org.apache.connect.mongo.replicator.Constants.HASH;
import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECTID;
+import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
import static org.apache.connect.mongo.replicator.Constants.OPERATION;
-import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE;
+import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
import static org.apache.connect.mongo.replicator.Constants.VERSION;
-public class EventConverter {
+public class Document2EventConverter {
public static ReplicationEvent convert(Document document, String replicaSetName) {
ReplicationEvent event = new ReplicationEvent();
- event.setOperationType(OperationType.getOperationType(document.getString(OPERATIONTYPE)));
+ event.setOperationType(OperationType.getOperationType(document.getString(OPERATION_TYPE)));
event.setTimestamp(document.get(TIMESTAMP, BsonTimestamp.class));
event.setH(document.getLong(HASH));
event.setV(document.getInteger(VERSION));
event.setNamespace(document.getString(NAMESPACE));
event.setEventData(Optional.ofNullable(document.get(OPERATION, Document.class)));
- event.setObjectId(Optional.ofNullable(document.get(OBJECTID, Document.class)));
+ event.setObjectId(Optional.ofNullable(document.get(OBJECT_ID, Document.class)));
event.setReplicaSetName(replicaSetName);
event.setDocument(document);
return event;
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
index 54df394..b418666 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
@@ -1,28 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator.event;
+import org.apache.commons.lang3.StringUtils;
+
public enum OperationType {
INSERT("i"),
UPDATE("u"),
DELETE("d"),
NOOP("n"),
- DBCOMMAND("c"),
+ DB_COMMAND("c"),
CREATED("created"),
UNKNOWN("unknown");
- private final String operationStr;
+ private final String operation;
- OperationType(String operationStr) {
- this.operationStr = operationStr;
+ OperationType(String operation) {
+ this.operation = operation;
}
- public static OperationType getOperationType(String operationStr) {
- for (OperationType operationType : OperationType.values()) {
- if (operationType.operationStr.equals(operationStr)) {
- return operationType;
- }
+ public static OperationType getOperationType(String operation) {
+
+ if (StringUtils.isEmpty(operation)) {
+ return UNKNOWN;
+ }
+
+ switch (operation) {
+ case "i":
+ return INSERT;
+ case "u":
+ return UPDATE;
+ case "d":
+ return DELETE;
+ case "n":
+ return NOOP;
+ case "c":
+ return DB_COMMAND;
+ case "created":
+ return CREATED;
+ default:
+ return UNKNOWN;
}
- return UNKNOWN;
}
}
diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
index 6407781..7adca71 100644
--- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.connect.mongo.replicator.event;
import io.openmessaging.connector.api.data.EntryType;
diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java
index a804b8b..31c8f4d 100644
--- a/src/test/java/org/apache/connect/mongo/FilterTest.java
+++ b/src/test/java/org/apache/connect/mongo/FilterTest.java
@@ -59,8 +59,9 @@ public class FilterTest {
ReplicationEvent replicationEvent = new ReplicationEvent();
replicationEvent.setOperationType(OperationType.NOOP);
Assert.assertFalse(filter.filterEvent(replicationEvent));
- replicationEvent.setOperationType(OperationType.DBCOMMAND);
+ replicationEvent.setOperationType(OperationType.DB_COMMAND);
Assert.assertTrue(filter.filterEvent(replicationEvent));
+
}
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
index 93adeeb..0f02064 100644
--- a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java
@@ -37,7 +37,7 @@ public class MongoFactoryTest {
@Test
public void testCreateMongoClientWithSSL() {
- sourceTaskConfig.setSsl("ssl");
+ sourceTaskConfig.setSsl(true);
MongoClientSettings settings = getSettings();
System.out.println(settings.getSslSettings());
Assert.assertTrue(settings.getSslSettings().isEnabled());
@@ -45,7 +45,7 @@ public class MongoFactoryTest {
@Test
public void testCreateMongoClientWithTSL() {
- sourceTaskConfig.setTsl("tsl");
+ sourceTaskConfig.setTsl(true);
MongoClientSettings settings = getSettings();
System.out.println(settings.getSslSettings());
Assert.assertTrue(settings.getSslSettings().isEnabled());
@@ -55,7 +55,7 @@ public class MongoFactoryTest {
public void testCreateMongoClientWithserverSelectionTimeoutMS() {
try {
replicaSetConfig.setReplicaSetName("testReplicatSet");
- sourceTaskConfig.setServerSelectionTimeoutMS("150");
+ sourceTaskConfig.setServerSelectionTimeoutMS(150);
System.out.println(getSettings().getClusterSettings());
Assert.assertTrue(getSettings().getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS) == 150);
} catch (MongoTimeoutException exception) {
@@ -65,7 +65,7 @@ public class MongoFactoryTest {
@Test
public void testCreateMongoClientWithConnectTimeoutMS() {
- sourceTaskConfig.setConnectTimeoutMS("1200");
+ sourceTaskConfig.setConnectTimeoutMS(1200);
System.out.println(getSettings().getSocketSettings());
Assert.assertTrue(getSettings().getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) == 1200);
@@ -73,40 +73,40 @@ public class MongoFactoryTest {
@Test
public void testCreateMongoClientWithSocketTimeoutMS() {
- sourceTaskConfig.setSocketTimeoutMS("1100");
+ sourceTaskConfig.setSocketTimeoutMS(1100);
System.out.println(getSettings().getSocketSettings());
Assert.assertTrue(getSettings().getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS) == 1100);
}
@Test
public void testCreateMongoClientWithInvalidHostNameAllowed() {
- sourceTaskConfig.setSslInvalidHostNameAllowed("true");
+ sourceTaskConfig.setSslInvalidHostNameAllowed(true);
System.out.println(getSettings().getSslSettings());
Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
- sourceTaskConfig.setSslInvalidHostNameAllowed("false");
+ sourceTaskConfig.setSslInvalidHostNameAllowed(false);
System.out.println(getSettings().getSslSettings());
Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
}
@Test
public void testCreateMongoClientWithInvalidHostNameAllowedTsl() {
- sourceTaskConfig.setTlsAllowInvalidHostnames("true");
+ sourceTaskConfig.setTlsAllowInvalidHostnames(true);
System.out.println(getSettings().getSslSettings());
Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
- sourceTaskConfig.setTlsAllowInvalidHostnames("false");
+ sourceTaskConfig.setTlsAllowInvalidHostnames(false);
System.out.println(getSettings().getSslSettings());
Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
}
@Test
public void testCreateMongoClientWithTlsinsecure() {
- sourceTaskConfig.setTlsInsecure("true");
+ sourceTaskConfig.setTlsInsecure(true);
System.out.println(getSettings().getSslSettings());
Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed());
- sourceTaskConfig.setTlsInsecure("false");
+ sourceTaskConfig.setTlsInsecure(false);
System.out.println(getSettings().getSslSettings());
Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed());
}
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
index f85e4a9..cc02fbc 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java
@@ -12,6 +12,7 @@ import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.connect.mongo.connector.MongoSourceConnector;
import org.apache.connect.mongo.connector.MongoSourceTask;
+import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
import org.apache.connect.mongo.replicator.event.OperationType;
@@ -73,7 +74,7 @@ public class MongoSourceConnectorTest {
Assert.assertEquals("testReplicaName", new String(sourcePartition.array()));
ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
- ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class);
+ Position position = JSONObject.parseObject(new String(sourcePosition.array()), Position.class);
Assert.assertEquals(position.getTimeStamp(), 1565609506);
Assert.assertEquals(position.getInc(), 1);
Assert.assertEquals(position.isInitSync(), false);
diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
index b696393..4983a66 100644
--- a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java
@@ -14,7 +14,6 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.connect.mongo.connector.MongoSourceTask;
-import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.ReplicaSet;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
@@ -31,7 +30,7 @@ public class MongoSourceTaskTest {
defaultKeyValue.put("positionTimeStamp", "11111111");
defaultKeyValue.put("positionInc", "111");
defaultKeyValue.put("serverSelectionTimeoutMS", "10");
- defaultKeyValue.put("dataSync", Constants.INITSYNC);
+ defaultKeyValue.put("dataSync", "true");
Field context = SourceTask.class.getDeclaredField("context");
context.setAccessible(true);
diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java
index 98e9a42..3d900fa 100644
--- a/src/test/java/org/apache/connect/mongo/MongoTest.java
+++ b/src/test/java/org/apache/connect/mongo/MongoTest.java
@@ -19,10 +19,11 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.connect.mongo.initsync.InitSync;
import org.apache.connect.mongo.replicator.Constants;
+import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSet;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.ReplicaSetsContext;
-import org.apache.connect.mongo.replicator.event.EventConverter;
+import org.apache.connect.mongo.replicator.event.Document2EventConverter;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.BsonTimestamp;
@@ -49,11 +50,11 @@ public class MongoTest {
oplog.put(Constants.TIMESTAMP, timestamp);
oplog.put(Constants.NAMESPACE, "test.person");
oplog.put(Constants.HASH, 11111L);
- oplog.put(Constants.OPERATIONTYPE, "i");
+ oplog.put(Constants.OPERATION_TYPE, "i");
Document document = new Document();
document.put("test", "test");
oplog.put(Constants.OPERATION, document);
- ReplicationEvent event = EventConverter.convert(oplog, "testR");
+ ReplicationEvent event = Document2EventConverter.convert(oplog, "testR");
Assert.assertEquals(timestamp, event.getTimestamp());
Assert.assertEquals("test.person", event.getNamespace());
Assert.assertTrue(11111L == event.getH());
@@ -95,16 +96,16 @@ public class MongoTest {
int syncCount = 0;
while (syncCount < count) {
Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll();
- Assert.assertNotNull(sourceDataEntries);
+ Assert.assertTrue(sourceDataEntries.size() > 0);
for (SourceDataEntry sourceDataEntry : sourceDataEntries) {
ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition();
Assert.assertEquals("test", new String(sourcePartition.array()));
ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition();
- ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition();
+ Position position = new Position();
position.setInitSync(true);
position.setTimeStamp(0);
position.setInc(0);
- Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class));
+ Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), Position.class));
EntryType entryType = sourceDataEntry.getEntryType();
Assert.assertEquals(EntryType.CREATE, entryType);
String queueName = sourceDataEntry.getQueueName();
@@ -122,4 +123,16 @@ public class MongoTest {
Assert.assertTrue(syncCount == count);
}
+
+ @Test
+ public void testCompareBsonTimestamp() {
+ BsonTimestamp lt = new BsonTimestamp(11111111, 1);
+ BsonTimestamp gt = new BsonTimestamp(11111111, 2);
+ Assert.assertTrue(lt.compareTo(gt) < 0);
+
+ lt = new BsonTimestamp(11111111, 1);
+ gt = new BsonTimestamp(22222222, 1);
+ Assert.assertTrue(lt.compareTo(gt) < 0);
+
+ }
}
diff --git a/src/test/java/org/apache/connect/mongo/OperationTypeTest.java b/src/test/java/org/apache/connect/mongo/OperationTypeTest.java
new file mode 100644
index 0000000..d8c5a9b
--- /dev/null
+++ b/src/test/java/org/apache/connect/mongo/OperationTypeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.connect.mongo;
+
+import org.apache.connect.mongo.replicator.event.OperationType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OperationTypeTest {
+
+ @Test
+ public void testGetOperationType() {
+ Assert.assertEquals(OperationType.INSERT, OperationType.getOperationType("i"));
+ Assert.assertEquals(OperationType.UPDATE, OperationType.getOperationType("u"));
+ Assert.assertEquals(OperationType.DELETE, OperationType.getOperationType("d"));
+ Assert.assertEquals(OperationType.NOOP, OperationType.getOperationType("n"));
+ Assert.assertEquals(OperationType.DB_COMMAND, OperationType.getOperationType("c"));
+ Assert.assertEquals(OperationType.CREATED, OperationType.getOperationType("created"));
+ Assert.assertEquals(OperationType.UNKNOWN, OperationType.getOperationType("test"));
+
+ }
+}
diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java
similarity index 75%
rename from src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
rename to src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java
index 5276f4f..1d3b743 100644
--- a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java
+++ b/src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java
@@ -2,26 +2,26 @@ package org.apache.connect.mongo;
import java.util.Map;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
-import org.apache.connect.mongo.replicator.ReplicaSets;
+import org.apache.connect.mongo.replicator.ReplicaSetManager;
import org.junit.Assert;
import org.junit.Test;
-public class ReplicaSetsTest {
+public class ReplicaSetManagerTest {
@Test(expected = IllegalArgumentException.class)
public void testCreatReplicaSetsExceptionWithoutMongoAddr() {
- ReplicaSets.create("");
+ ReplicaSetManager.create("");
}
@Test(expected = IllegalArgumentException.class)
public void testCreatReplicaSetsExceptioWithoutReplicaSetName() {
- ReplicaSets.create("127.0.0.1:27081");
+ ReplicaSetManager.create("127.0.0.1:27081");
}
@Test
public void testCreatReplicaSetsSpecialReplicaSetName() {
- ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
- Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+ ReplicaSetManager replicaSetManager = ReplicaSetManager.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+ Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName();
Assert.assertTrue(replicaSetConfigMap.size() == 1);
Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
@@ -30,8 +30,8 @@ public class ReplicaSetsTest {
@Test
public void testCreatReplicaSetsSpecialShardNameAndReplicaSetName() {
- ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
- Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+ ReplicaSetManager replicaSetManager = ReplicaSetManager.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083");
+ Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName();
Assert.assertTrue(replicaSetConfigMap.size() == 1);
Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
@@ -41,8 +41,8 @@ public class ReplicaSetsTest {
@Test
public void testCreatReplicaSetsMutiMongoAddr() {
- ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
- Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName();
+ ReplicaSetManager replicaSetManager = ReplicaSetManager.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283");
+ Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName();
Assert.assertTrue(replicaSetConfigMap.size() == 2);
Assert.assertNotNull(replicaSetConfigMap.get("replicaName1"));
Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());