You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/07/03 06:22:26 UTC
incubator-rocketmq-externals git commit: To prepare release mysql
replicator 1.0.0 version
Repository: incubator-rocketmq-externals
Updated Branches:
refs/heads/master 934e7bc12 -> 62eb9ace6
To prepare release mysql replicator 1.0.0 version
Author: zhaoqun007 <91...@zhaoqun911.cn>
Closes #25 from zhaoqun911/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/62eb9ace
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/62eb9ace
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/62eb9ace
Branch: refs/heads/master
Commit: 62eb9ace63c5272a24bc3bb32f68e7b9bb84851f
Parents: 934e7bc
Author: zhaoqun007 <91...@zhaoqun911.cn>
Authored: Mon Jul 3 14:22:04 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Mon Jul 3 14:22:04 2017 +0800
----------------------------------------------------------------------
rocketmq-mysql/LICENSE-BIN | 4 ++
rocketmq-mysql/README.md | 2 +-
rocketmq-mysql/doc/dataflow.png | Bin 28402 -> 28277 bytes
rocketmq-mysql/pom.xml | 2 +-
.../java/org/apache/rocketmq/mysql/Config.java | 2 +-
.../org/apache/rocketmq/mysql/Replicator.java | 12 ++---
.../rocketmq/mysql/binlog/EventProcessor.java | 31 +++++++-----
.../rocketmq/mysql/offset/OffsetLogThread.java | 48 -------------------
.../mysql/position/BinlogPositionLogThread.java | 47 ++++++++++++++++++
.../mysql/position/BinlogPositionManager.java | 18 +++++--
.../mysql/productor/RocketMQProducer.java | 2 +-
rocketmq-mysql/src/main/resources/logback.xml | 14 +++---
12 files changed, 103 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
index 0810a15..5349fbd 100644
--- a/rocketmq-mysql/LICENSE-BIN
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -295,3 +295,7 @@ The Apache Software Foundation (http://www.apache.org/).
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+------
+This product has a bundle open-replicator, which is available under the ASL2 License.
+The source code of open-replicator can be found at https://github.com/whitesock/open-replicator.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
index a2f249b..65efb05 100644
--- a/rocketmq-mysql/README.md
+++ b/rocketmq-mysql/README.md
@@ -36,7 +36,7 @@ and sends it to RocketMQ in json format. Besides MySQL, other systems can also c
|mysqlPassword |false | |password of MySQL account|
|mqNamesrvAddr |false | |RocketMQ name server address (e.g.,127.0.0.1:9876)|
|mqTopic |false | |RocketMQ topic name|
-|startType |true |NEW_EVENT |The way that the replicator starts processing data,there are three options available:<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- SPECIFIED:starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null|
+|startType |true |DEFAULT |The way that the replicator starts processing data,there are four options available:<br>- DEFAULT: try to start processing data in the "LAST_PROCESSED" way,if failed, then in the "NEW_EVENT" way<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- SPECIFIED: starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null|
|binlogFilename |true | |If "startType" is "SPECIFIED",the replicator will begin to replicate from this binlog file|
|nextPosition |true | |If "startType" is "SPECIFIED",the replicator will begin to replicate from this position|
|maxTransactionRows|true |100 |max rows of the transaction pushed to RocketMQ|
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/doc/dataflow.png
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/doc/dataflow.png b/rocketmq-mysql/doc/dataflow.png
index 179f24d..ed12b52 100644
Binary files a/rocketmq-mysql/doc/dataflow.png and b/rocketmq-mysql/doc/dataflow.png differ
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
index f1b050a..c09826c 100644
--- a/rocketmq-mysql/pom.xml
+++ b/rocketmq-mysql/pom.xml
@@ -6,7 +6,7 @@
<groupId>org.apache</groupId>
<artifactId>rocketmq-mysql-replicator</artifactId>
- <version>1.0.0-SNAPSHOT</version>
+ <version>1.0.0</version>
<scm>
<url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
index ce4c52d..6c14cb4 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
@@ -33,7 +33,7 @@ public class Config {
public String mqNamesrvAddr;
public String mqTopic;
- public String startType = "NEW_EVENT";
+ public String startType = "DEFAULT";
public String binlogFilename;
public Long nextPosition;
public Integer maxTransactionRows = 100;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
index b358567..ae3c984 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.mysql;
import org.apache.rocketmq.mysql.binlog.EventProcessor;
import org.apache.rocketmq.mysql.binlog.Transaction;
-import org.apache.rocketmq.mysql.offset.OffsetLogThread;
+import org.apache.rocketmq.mysql.position.BinlogPositionLogThread;
import org.apache.rocketmq.mysql.productor.RocketMQProducer;
import org.apache.rocketmq.mysql.position.BinlogPosition;
import org.slf4j.Logger;
@@ -29,7 +29,7 @@ public class Replicator {
private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
- private static final Logger OFFSET_LOGGER = LoggerFactory.getLogger("OffsetLogger");
+ private static final Logger POSITION_LOGGER = LoggerFactory.getLogger("PositionLogger");
private Config config;
@@ -57,8 +57,8 @@ public class Replicator {
rocketMQProducer = new RocketMQProducer(config);
rocketMQProducer.start();
- OffsetLogThread offsetLogThread = new OffsetLogThread(this);
- offsetLogThread.start();
+ BinlogPositionLogThread binlogPositionLogThread = new BinlogPositionLogThread(this);
+ binlogPositionLogThread.start();
eventProcessor = new EventProcessor(this);
eventProcessor.start();
@@ -95,7 +95,7 @@ public class Replicator {
}
}
- public void logOffset() {
+ public void logPosition() {
String binlogFilename = null;
long xid = 0L;
@@ -112,7 +112,7 @@ public class Replicator {
}
if (binlogFilename != null) {
- OFFSET_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",
+ POSITION_LOGGER.info("XID: {}, BINLOG_FILE: {}, NEXT_POSITION: {}, NEXT_OFFSET: {}",
xid, binlogFilename, nextPosition, nextOffset);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
index 515bd64..ba35d3e 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.sql.DataSource;
import org.apache.rocketmq.mysql.Config;
@@ -111,7 +112,11 @@ public class EventProcessor {
while (true) {
try {
- BinlogEventV4 event = queue.take();
+ BinlogEventV4 event = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (event == null) {
+ checkConnection();
+ continue;
+ }
switch (event.getHeader().getEventType()) {
case MySQLConstants.TABLE_MAP_EVENT:
@@ -158,8 +163,20 @@ public class EventProcessor {
}
}
- private void processTableMapEvent(BinlogEventV4 event) {
+ private void checkConnection() throws Exception {
+
+ if (!openReplicator.isRunning()) {
+ BinlogPosition binlogPosition = replicator.getNextBinlogPosition();
+ if (binlogPosition != null) {
+ openReplicator.setBinlogFileName(binlogPosition.getBinlogFilename());
+ openReplicator.setBinlogPosition(binlogPosition.getPosition());
+ }
+ openReplicator.start();
+ }
+ }
+
+ private void processTableMapEvent(BinlogEventV4 event) {
TableMapEvent tableMapEvent = (TableMapEvent) event;
String dbName = tableMapEvent.getDatabaseName().toString();
String tableName = tableMapEvent.getTableName().toString();
@@ -171,7 +188,6 @@ public class EventProcessor {
}
private void processWriteEvent(BinlogEventV4 event) {
-
WriteRowsEvent writeRowsEvent = (WriteRowsEvent) event;
Long tableId = writeRowsEvent.getTableId();
List<Row> list = writeRowsEvent.getRows();
@@ -182,7 +198,6 @@ public class EventProcessor {
}
private void processWriteEventV2(BinlogEventV4 event) {
-
WriteRowsEventV2 writeRowsEventV2 = (WriteRowsEventV2) event;
Long tableId = writeRowsEventV2.getTableId();
List<Row> list = writeRowsEventV2.getRows();
@@ -194,7 +209,6 @@ public class EventProcessor {
}
private void processUpdateEvent(BinlogEventV4 event) {
-
UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event;
Long tableId = updateRowsEvent.getTableId();
List<Pair<Row>> list = updateRowsEvent.getRows();
@@ -205,7 +219,6 @@ public class EventProcessor {
}
private void processUpdateEventV2(BinlogEventV4 event) {
-
UpdateRowsEventV2 updateRowsEventV2 = (UpdateRowsEventV2) event;
Long tableId = updateRowsEventV2.getTableId();
List<Pair<Row>> list = updateRowsEventV2.getRows();
@@ -216,7 +229,6 @@ public class EventProcessor {
}
private void processDeleteEvent(BinlogEventV4 event) {
-
DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event;
Long tableId = deleteRowsEvent.getTableId();
List<Row> list = deleteRowsEvent.getRows();
@@ -228,7 +240,6 @@ public class EventProcessor {
}
private void processDeleteEventV2(BinlogEventV4 event) {
-
DeleteRowsEventV2 deleteRowsEventV2 = (DeleteRowsEventV2) event;
Long tableId = deleteRowsEventV2.getTableId();
List<Row> list = deleteRowsEventV2.getRows();
@@ -243,7 +254,6 @@ public class EventProcessor {
Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
private void processQueryEvent(BinlogEventV4 event) {
-
QueryEvent queryEvent = (QueryEvent) event;
String sql = queryEvent.getSql().toString();
@@ -253,7 +263,6 @@ public class EventProcessor {
}
private void processXidEvent(BinlogEventV4 event) {
-
XidEvent xidEvent = (XidEvent) event;
String binlogFilename = xidEvent.getBinlogFilename();
Long position = xidEvent.getHeader().getNextPosition();
@@ -293,7 +302,7 @@ public class EventProcessor {
}
private void initDataSource() throws Exception {
- Map<String,String> map = new HashMap<>();
+ Map<String, String> map = new HashMap<>();
map.put("driverClassName", "com.mysql.jdbc.Driver");
map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false");
map.put("username", config.mysqlUsername);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
deleted file mode 100644
index c7ab4e6..0000000
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.mysql.offset;
-
-import org.apache.rocketmq.mysql.Replicator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OffsetLogThread extends Thread {
- private Logger logger = LoggerFactory.getLogger(OffsetLogThread.class);
-
- private Replicator replicator;
-
- public OffsetLogThread(Replicator replicator) {
- this.replicator = replicator;
- setDaemon(true);
- }
-
- @Override
- public void run() {
-
- while (true) {
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.error("Offset thread interrupted.", e);
- }
-
- replicator.logOffset();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
new file mode 100644
index 0000000..dedb08f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionLogThread.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+import org.apache.rocketmq.mysql.Replicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BinlogPositionLogThread extends Thread {
+ private Logger logger = LoggerFactory.getLogger(BinlogPositionLogThread.class);
+
+ private Replicator replicator;
+
+ public BinlogPositionLogThread(Replicator replicator) {
+ this.replicator = replicator;
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.error("Offset thread interrupted.", e);
+ }
+
+ replicator.logPosition();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
index da4eeae..bf621b5 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -46,16 +46,19 @@ public class BinlogPositionManager {
public void initBeginPosition() throws Exception {
- if (config.startType == null || config.startType.equals("NEW_EVENT")) {
+ if (config.startType == null || config.startType.equals("DEFAULT")) {
+ initPositionDefault();
+ } else if (config.startType.equals("NEW_EVENT")) {
initPositionFromBinlogTail();
- } else if (config.startType.equals("LAST_PROCESSED")) {
+ } else if (config.startType.equals("LAST_PROCESSED")) {
initPositionFromMqTail();
- } else if (config.startType.equals("SPECIFIED")) {
+ } else if (config.startType.equals("SPECIFIED")) {
binlogFilename = config.binlogFilename;
nextPosition = config.nextPosition;
+
}
if (binlogFilename == null || nextPosition == null) {
@@ -63,6 +66,15 @@ public class BinlogPositionManager {
}
}
+ private void initPositionDefault() throws Exception {
+ initPositionFromMqTail();
+
+ if (binlogFilename == null || nextPosition == null) {
+ initPositionFromBinlogTail();
+ }
+
+ }
+
private void initPositionFromMqTail() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
consumer.setNamesrvAddr(config.mqNamesrvAddr);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
index 49ca06f..38aca7f 100644
--- a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
@@ -49,4 +49,4 @@ public class RocketMQProducer {
return sendResult.getQueueOffset();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/62eb9ace/rocketmq-mysql/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/resources/logback.xml b/rocketmq-mysql/src/main/resources/logback.xml
index 4d0292e..d4993de 100644
--- a/rocketmq-mysql/src/main/resources/logback.xml
+++ b/rocketmq-mysql/src/main/resources/logback.xml
@@ -46,11 +46,11 @@
</encoder>
</appender>
- <appender name="OffsetAppender"
+ <appender name="PositionAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <file>./logs/offset.log</file>
+ <file>./logs/position.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
- <fileNamePattern>./logs/offset.%i.log
+ <fileNamePattern>./logs/position.%i.log
</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
@@ -60,20 +60,20 @@
<maxFileSize>10MB</maxFileSize>
</triggeringPolicy>
<encoder>
- <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<root>
- <level value="DEBUG"/>
+ <level value="INFO"/>
<appender-ref ref="DefaultConsoleAppender"/>
<appender-ref ref="DefaultFileAppender"/>
</root>
- <logger name="OffsetLogger" additivity="false">
+ <logger name="PositionLogger" additivity="false">
<level value="INFO"/>
- <appender-ref ref="OffsetAppender"/>
+ <appender-ref ref="PositionAppender"/>
</logger>
</configuration>