You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/06/15 09:50:25 UTC

[1/6] incubator-rocketmq-externals git commit: RocketMQ-MySQL 1.0-snapshot closes apache/incubator-rocketmq-externals#24

Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master 90be3667a -> 54020df25


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java
new file mode 100644
index 0000000..a6197a3
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Schema.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Schema {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private static final List<String> IGNORED_DATABASES = new ArrayList<>(
+        Arrays.asList(new String[] {"information_schema", "mysql", "performance_schema", "sys"})
+    );
+
+    private DataSource dataSource;
+
+    private Map<String, Database> dbMap;
+
+    public Schema(DataSource dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    public void load() throws SQLException {
+
+        dbMap = new HashMap<>();
+
+        String sql = "select schema_name from information_schema.schemata";
+
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(sql);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String dbName = rs.getString(1);
+                if (!IGNORED_DATABASES.contains(dbName)) {
+                    Database database = new Database(dbName, dataSource);
+                    dbMap.put(dbName, database);
+                }
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+        for (Database db : dbMap.values()) {
+            db.init();
+        }
+
+    }
+
+    public Table getTable(String dbName, String tableName) {
+
+        if (dbMap == null) {
+            reload();
+        }
+
+        Database database = dbMap.get(dbName);
+        if (database == null) {
+            return null;
+        }
+
+        Table table = database.getTable(tableName);
+        if (table == null) {
+            return null;
+        }
+
+        return table;
+    }
+
+    private void reload() {
+
+        while (true) {
+
+            try {
+                load();
+                break;
+            } catch (Exception e) {
+                LOGGER.error("Reload schema error.", e);
+            }
+        }
+    }
+
+    public void reset() {
+        dbMap = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java
new file mode 100644
index 0000000..175ec80
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Table.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+
+public class Table {
+
+    private String database;
+    private String name;
+    private List<String> colList = new LinkedList<String>();
+    private List<ColumnParser> parserList = new LinkedList<ColumnParser>();
+
+    public Table(String database, String table) {
+        this.database = database;
+        this.name = table;
+    }
+
+    public void addCol(String column) {
+        colList.add(column);
+    }
+
+    public void addParser(ColumnParser columnParser) {
+        parserList.add(columnParser);
+    }
+
+    public List<String> getColList() {
+        return colList;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public List<ColumnParser> getParserList() {
+        return parserList;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
new file mode 100644
index 0000000..667db75
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/BigIntColumnParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.math.BigInteger;
+
+public class BigIntColumnParser extends ColumnParser {
+
+    private static BigInteger max = BigInteger.ONE.shiftLeft(64);
+
+    private boolean signed;
+
+    public BigIntColumnParser(String colType) {
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return value;
+        }
+
+        Long l = (Long) value;
+        if (!signed && l < 0) {
+            return max.add(BigInteger.valueOf(l));
+        } else {
+            return l;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
new file mode 100644
index 0000000..0a63410
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/ColumnParser.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ */
+public abstract class ColumnParser {
+
+    public static ColumnParser getColumnParser(String dataType, String colType, String charset) {
+
+        switch (dataType) {
+            case "tinyint":
+            case "smallint":
+            case "mediumint":
+            case "int":
+                return new IntColumnParser(dataType, colType);
+            case "bigint":
+                return new BigIntColumnParser(colType);
+            case "tinytext":
+            case "text":
+            case "mediumtext":
+            case "longtext":
+            case "varchar":
+            case "char":
+                return new StringColumnParser(charset);
+            case "date":
+                return new DateTimeColumnParser();
+            case "datetime":
+            case "timestamp":
+                return new DateTimeColumnParser();
+            case "time":
+                return new TimeColumnParser();
+            case "year":
+                return new YearColumnParser();
+            case "enum":
+                return new EnumColumnParser(colType);
+            case "set":
+                return new SetColumnParser(colType);
+            default:
+                return new DefaultColumnParser();
+        }
+    }
+
+    public static String[] extractEnumValues(String colType) {
+
+        String[] enumValues;
+        Matcher matcher = Pattern.compile("(enum|set)\\((.*)\\)").matcher(colType);
+        matcher.matches();
+
+        enumValues = matcher.group(2).replace("'", "").split(",");
+
+        return enumValues;
+    }
+
+    public abstract Object getValue(Object value);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
new file mode 100644
index 0000000..97339d8
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DateTimeColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+
+public class DateTimeColumnParser extends ColumnParser {
+
+    private static SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return dateTimeFormat.format(value);
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
new file mode 100644
index 0000000..46eb48e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/DefaultColumnParser.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import org.apache.commons.codec.binary.Base64;
+
+public class DefaultColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof byte[]) {
+            return Base64.encodeBase64String((byte[]) value);
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
new file mode 100644
index 0000000..2942103
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/EnumColumnParser.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+public class EnumColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public EnumColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        Integer i = (Integer) value;
+        if (i == 0) {
+            return null;
+        } else {
+            return enumValues[i - 1];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
new file mode 100644
index 0000000..1041436
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/IntColumnParser.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+public class IntColumnParser extends ColumnParser {
+
+    private int bits;
+    private boolean signed;
+
+    public IntColumnParser(String dataType, String colType) {
+
+        switch (dataType) {
+            case "tinyint":
+                bits = 8;
+                break;
+            case "smallint":
+                bits = 16;
+                break;
+            case "mediumint":
+                bits = 24;
+                break;
+            case "int":
+                bits = 32;
+        }
+
+        this.signed = !colType.matches(".* unsigned$");
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Long) {
+            return value;
+        }
+
+
+
+        if (value instanceof Integer) {
+
+            Integer i = (Integer) value;
+            if (signed || i > 0) {
+                return i;
+            } else {
+                return (1L << bits) + i;
+            }
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
new file mode 100644
index 0000000..1fbb151
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/SetColumnParser.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+public class SetColumnParser extends ColumnParser {
+
+    private String[] enumValues;
+
+    public SetColumnParser(String colType) {
+        enumValues = extractEnumValues(colType);
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        StringBuffer buffer = new StringBuffer();
+        long l = (Long) value;
+
+        boolean needSplit = false;
+        for (int i = 0; i < enumValues.length; i++) {
+            if (((l >> i) & 1) == 1) {
+                if (needSplit)
+                    buffer.append(",");
+
+                buffer.append(enumValues[i]);
+                needSplit = true;
+            }
+        }
+
+        return buffer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
new file mode 100644
index 0000000..a76e6f1
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/StringColumnParser.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.nio.charset.Charset;
+
+public class StringColumnParser extends ColumnParser {
+
+    private String charset;
+
+    public StringColumnParser(String charset) {
+        this.charset = charset;
+    }
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return value;
+        }
+
+        byte[] bytes = (byte[]) value;
+
+        switch (charset) {
+            case "utf8":
+            case "utf8mb4":
+                return new String(bytes, Charset.forName("UTF-8"));
+            case "latin1":
+            case "ascii":
+                return new String(bytes, Charset.forName("ISO-8859-1"));
+            case "ucs2":
+                return new String(bytes, Charset.forName("UTF-16"));
+            default:
+                return new String(bytes, Charset.forName(charset));
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
new file mode 100644
index 0000000..113a4e5
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/TimeColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public class TimeColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+
+            Time time = new Time(((Timestamp) value).getTime());
+            return time;
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
new file mode 100644
index 0000000..0419933
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/column/YearColumnParser.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema.column;
+
+import java.sql.Date;
+import java.util.Calendar;
+
+public class YearColumnParser extends ColumnParser {
+
+    @Override
+    public Object getValue(Object value) {
+
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTime((Date) value);
+            return calendar.get(Calendar.YEAR);
+        }
+
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/resources/logback.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/resources/logback.xml b/rocketmq-mysql/src/main/resources/logback.xml
new file mode 100644
index 0000000..4d0292e
--- /dev/null
+++ b/rocketmq-mysql/src/main/resources/logback.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+
+
+    <appender name="DefaultConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+
+    <appender name="DefaultFileAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>./logs/rocketmq_mysql.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>./logs/rocketmq_mysql.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <appender name="OffsetAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>./logs/offset.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>./logs/offset.%i.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>10MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <root>
+        <level value="DEBUG"/>
+        <appender-ref ref="DefaultConsoleAppender"/>
+        <appender-ref ref="DefaultFileAppender"/>
+    </root>
+
+    <logger name="OffsetLogger" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="OffsetAppender"/>
+    </logger>
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
new file mode 100644
index 0000000..4a7a35f
--- /dev/null
+++ b/rocketmq-mysql/src/main/resources/rocketmq_mysql.conf
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+mysqlAddr=
+mysqlPort=
+mysqlUsername=
+mysqlPassword=
+
+mqNamesrvAddr=
+mqTopic=
+
+#startType=
+#binlogFilename=
+#nextPosition=
+#maxTransactionRows=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java
new file mode 100644
index 0000000..8f153f2
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/BigIntColumnParserTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import java.math.BigInteger;
+import org.apache.rocketmq.mysql.schema.column.BigIntColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class BigIntColumnParserTest {
+
+    @Test
+    public void testBigInt() {
+
+        BigIntColumnParser parser = new BigIntColumnParser("bigint(20) unsigned");
+
+        BigInteger v1 = (BigInteger) parser.getValue(Long.MIN_VALUE);
+        BigInteger v2 = BigInteger.valueOf(Long.MAX_VALUE).add(BigInteger.ONE);
+        assertEquals(v1, v2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java
new file mode 100644
index 0000000..c24af54
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/EnumColumnParserTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.schema.column.EnumColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EnumColumnParserTest {
+
+    @Test
+    public void testEnum() {
+
+        String colType = "enum('a','b','c','d')";
+
+        EnumColumnParser parser = new EnumColumnParser(colType);
+        String v = (String) parser.getValue(3);
+        assertEquals(v, "c");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java
new file mode 100644
index 0000000..33bbc8e
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/IntColumnParserTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.schema.column.IntColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class IntColumnParserTest {
+
+    @Test
+    public void testInt() {
+        IntColumnParser parser = new IntColumnParser("int", "int(10) unsigned");
+
+        Long v1 = (Long) parser.getValue(Integer.MIN_VALUE);
+        Long v2 = Long.valueOf(Integer.MAX_VALUE) + 1;
+        assertEquals(v1, v2);
+    }
+
+    @Test
+    public void testSmallint() {
+
+        IntColumnParser parser = new IntColumnParser("smallint", "smallint(5) unsigned");
+
+        Long v1 = (Long) parser.getValue(Integer.valueOf(Short.MIN_VALUE));
+        Long v2 = Long.valueOf(Short.MAX_VALUE + 1);
+        assertEquals(v1, v2);
+    }
+
+    @Test
+    public void testTinyint() {
+
+        IntColumnParser parser = new IntColumnParser("tinyint", "tinyint(3) unsigned");
+
+        Long v1 = (Long) parser.getValue(Integer.valueOf(Byte.MIN_VALUE));
+        Long v2 = Long.valueOf(Byte.MAX_VALUE + 1);
+        assertEquals(v1, v2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java
new file mode 100644
index 0000000..f762364
--- /dev/null
+++ b/rocketmq-mysql/src/test/java/org/apache/rocketmq/mysql/SetColumnParserTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.schema.column.SetColumnParser;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SetColumnParserTest {
+
+    @Test
+    public void testSet() {
+
+        String colType = "set('a','b','c','d')";
+
+        SetColumnParser parser = new SetColumnParser(colType);
+        String v = (String)parser.getValue(1001L);
+        assertEquals(v , "a,d");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/style/copyright/Apache.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/copyright/Apache.xml b/rocketmq-mysql/style/copyright/Apache.xml
new file mode 100644
index 0000000..e3e3dec
--- /dev/null
+++ b/rocketmq-mysql/style/copyright/Apache.xml
@@ -0,0 +1,23 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache" />
+        <option name="notice" value="Licensed to the Apache Software Foundation (ASF) under one or more&#10;contributor license agreements.  See the NOTICE file distributed with&#10;this work for additional information regarding copyright ownership.&#10;The ASF licenses this file to You under the Apache License, Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in compliance with&#10;the License.  You may obtain a copy of the License at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless required by applicable law or agreed to in writing, software&#10;distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the License for the specific language governing permissions and&#10;limitations under the License." />
+    </copyright>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/style/copyright/profiles_settings.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/copyright/profiles_settings.xml b/rocketmq-mysql/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..747c7e2
--- /dev/null
+++ b/rocketmq-mysql/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3" />
+            <option name="addBlankAfter" value="false" />
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/style/rmq_checkstyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/rmq_checkstyle.xml b/rocketmq-mysql/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..2872eb7
--- /dev/null
+++ b/rocketmq-mysql/style/rmq_checkstyle.xml
@@ -0,0 +1,134 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/style/rmq_codeStyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/style/rmq_codeStyle.xml b/rocketmq-mysql/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..7c7ce54
--- /dev/null
+++ b/rocketmq-mysql/style/rmq_codeStyle.xml
@@ -0,0 +1,143 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="true"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file


[2/6] incubator-rocketmq-externals git commit: RocketMQ-MySQL 1.0-snapshot closes apache/incubator-rocketmq-externals#24

Posted by vo...@apache.org.
RocketMQ-MySQL 1.0-snapshot closes apache/incubator-rocketmq-externals#24


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/5593575c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/5593575c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/5593575c

Branch: refs/heads/master
Commit: 5593575cbbeb4a6e92ba3d47caef0c7cd48e7278
Parents: aaa0758
Author: zhaoqun911 <91...@zhaoqun911.cn>
Authored: Wed Jun 14 20:36:26 2017 +0800
Committer: vongosling <vo...@alibaba-inc.com>
Committed: Wed Jun 14 20:36:26 2017 +0800

----------------------------------------------------------------------
 rocketmq-mysql/.gitignore                       |  14 +
 rocketmq-mysql/LICENSE-BIN                      | 279 ++++++++++++++++
 rocketmq-mysql/NOTICE-BIN                       |   5 +
 rocketmq-mysql/README.md                        |  41 ++-
 rocketmq-mysql/pom.xml                          | 275 ++++++++++++++++
 rocketmq-mysql/src/main/assembly/assembly.xml   |  61 ++++
 .../src/main/assembly/scripts/start.sh          |  23 ++
 .../src/main/assembly/scripts/stop.sh           |  18 ++
 .../java/org/apache/rocketmq/mysql/Config.java  | 130 ++++++++
 .../org/apache/rocketmq/mysql/Replicator.java   | 129 ++++++++
 .../apache/rocketmq/mysql/binlog/DataRow.java   |  79 +++++
 .../rocketmq/mysql/binlog/EventListener.java    |  46 +++
 .../rocketmq/mysql/binlog/EventProcessor.java   | 318 +++++++++++++++++++
 .../rocketmq/mysql/binlog/Transaction.java      |  91 ++++++
 .../rocketmq/mysql/offset/OffsetLogThread.java  |  49 +++
 .../rocketmq/mysql/position/BinlogPosition.java |  47 +++
 .../mysql/position/BinlogPositionManager.java   | 131 ++++++++
 .../mysql/productor/RocketMQProducer.java       |  59 ++++
 .../apache/rocketmq/mysql/schema/Database.java  | 108 +++++++
 .../apache/rocketmq/mysql/schema/Schema.java    | 128 ++++++++
 .../org/apache/rocketmq/mysql/schema/Table.java |  59 ++++
 .../mysql/schema/column/BigIntColumnParser.java |  50 +++
 .../mysql/schema/column/ColumnParser.java       |  76 +++++
 .../schema/column/DateTimeColumnParser.java     |  40 +++
 .../schema/column/DefaultColumnParser.java      |  37 +++
 .../mysql/schema/column/EnumColumnParser.java   |  46 +++
 .../mysql/schema/column/IntColumnParser.java    |  69 ++++
 .../mysql/schema/column/SetColumnParser.java    |  55 ++++
 .../mysql/schema/column/StringColumnParser.java |  57 ++++
 .../mysql/schema/column/TimeColumnParser.java   |  40 +++
 .../mysql/schema/column/YearColumnParser.java   |  40 +++
 rocketmq-mysql/src/main/resources/logback.xml   |  79 +++++
 .../src/main/resources/rocketmq_mysql.conf      |  28 ++
 .../rocketmq/mysql/BigIntColumnParserTest.java  |  38 +++
 .../rocketmq/mysql/EnumColumnParserTest.java    |  38 +++
 .../rocketmq/mysql/IntColumnParserTest.java     |  56 ++++
 .../rocketmq/mysql/SetColumnParserTest.java     |  38 +++
 rocketmq-mysql/style/copyright/Apache.xml       |  23 ++
 .../style/copyright/profiles_settings.xml       |  64 ++++
 rocketmq-mysql/style/rmq_checkstyle.xml         | 134 ++++++++
 rocketmq-mysql/style/rmq_codeStyle.xml          | 143 +++++++++
 41 files changed, 3240 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/.gitignore
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/.gitignore b/rocketmq-mysql/.gitignore
new file mode 100644
index 0000000..3311eab
--- /dev/null
+++ b/rocketmq-mysql/.gitignore
@@ -0,0 +1,14 @@
+*dependency-reduced-pom.xml
+.classpath
+.project
+.settings/
+target/
+devenv
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+*bin
+!NOTICE-BIN
+!LICENSE-BIN
+.DS_Store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/LICENSE-BIN b/rocketmq-mysql/LICENSE-BIN
new file mode 100644
index 0000000..5d47613
--- /dev/null
+++ b/rocketmq-mysql/LICENSE-BIN
@@ -0,0 +1,279 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+
+------
+This product has a bundle logback, which is available under the EPL v1.0 License.
+The source code of logback can be found at https://github.com/qos-ch/logback.
+
+Logback LICENSE
+---------------
+
+Logback: the reliable, generic, fast and flexible logging framework.
+Copyright (C) 1999-2015, QOS.ch. All rights reserved.
+
+This program and the accompanying materials are dual-licensed under
+either the terms of the Eclipse Public License v1.0 as published by
+the Eclipse Foundation
+
+  or (per the licensee's choosing)
+
+under the terms of the GNU Lesser General Public License version 2.1
+as published by the Free Software Foundation.
+
+------
+This product has a bundle slf4j, which is available under the MIT License.
+The source code of slf4j can be found at https://github.com/qos-ch/slf4j.
+
+ Copyright (c) 2004-2017 QOS.ch
+ All rights reserved.
+
+ Permission is hereby granted, free  of charge, to any person obtaining
+ a  copy  of this  software  and  associated  documentation files  (the
+ "Software"), to  deal in  the Software without  restriction, including
+ without limitation  the rights to  use, copy, modify,  merge, publish,
+ distribute,  sublicense, and/or sell  copies of  the Software,  and to
+ permit persons to whom the Software  is furnished to do so, subject to
+ the following conditions:
+
+ The  above  copyright  notice  and  this permission  notice  shall  be
+ included in all copies or substantial portions of the Software.
+
+ THE  SOFTWARE IS  PROVIDED  "AS  IS", WITHOUT  WARRANTY  OF ANY  KIND,
+ EXPRESS OR  IMPLIED, INCLUDING  BUT NOT LIMITED  TO THE  WARRANTIES OF
+ MERCHANTABILITY,    FITNESS    FOR    A   PARTICULAR    PURPOSE    AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+------
+This product has a bundle fastjson, which is available under the ASL2 License.
+The source code of fastjson can be found at https://github.com/alibaba/fastjson.
+
+ Copyright 1999-2016 Alibaba Group Holding Ltd.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+------
+This product has a bundle commons-codec, which is available under the ASL2 License.
+The source code of commons-codec can be found at http://svn.apache.org/viewvc/commons/proper/codec/trunk/.
+
+Apache Commons Codec
+Copyright 2002-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+contains test data from http://aspell.net/test/orig/batch0.tab.
+Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/NOTICE-BIN
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/NOTICE-BIN b/rocketmq-mysql/NOTICE-BIN
new file mode 100644
index 0000000..5384857
--- /dev/null
+++ b/rocketmq-mysql/NOTICE-BIN
@@ -0,0 +1,5 @@
+Apache RocketMQ (incubating)
+Copyright 2016-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
index 8b47a0e..cd3b54c 100644
--- a/rocketmq-mysql/README.md
+++ b/rocketmq-mysql/README.md
@@ -4,6 +4,45 @@
 ## Overview
 ![overview](./doc/overview.png)
 
+The RocketMQ-MySQL is a data replicator between MySQL and other systems, 
+the replicator simulates a MySQL slave instance and parses the binlog event 
+and sends it to the RocketMQ in json format ,other systems can consume data from RocketMQ. 
+With the RocketMQ-MySQL Replicator,more systems can process data from MySQL binlog 
+in a simple and low cost method.
+
+## Dataflow
+![dataflow](./doc/dataflow.png)
+
+* 1.Firstly,get the last data from the queue,and get the binlog position from this data,and if the queue data is null,then use the latest binlog position of the MySQL,and surely user can also specify this position on his own;
+* 2.Send a binlog dump request to the MySQL;
+* 3.The MySQL push binlog event to the replicator,the replicator parses the data and accumulate as a transaction-object;
+* 4.Add the next-position of the transaction to the transaction-object and send it in json format to the queue;
+* 5.Record the binlog position and the offset in the queue of the latest transaction every second.
+
+
+## Quick Start
+
+* 1.Create an account with MySQL replication permission,which is used to simulate the MySQL slave to get the binlog event,and the replication must be in row mode;
+* 2.Create a topic in the RocketMQ to store binlog events,in order to ensure that the downstream system consumes the data orderly,the topic must have only one queue;
+* 3.Configure relevant information of MySQL and RocketMQ in the RocketMQ-MySQL.conf file;
+* 4.Execute"mvn install",and then start the replicator(execute "nohup ./start.sh &");
+* 5.Subscribe to and process the messages in your system.
+
+
+## Configuration Instruction
+|key               |nullable|default    |description|
+|------------------|--------|-----------|-----------|
+|mysqlAddr         |false   |           |MySQL address|
+|mysqlPort         |false   |           |MySQL port|
+|mysqlUsername     |false   |           |username of MySQL account|
+|mysqlPassword     |false   |           |password of MySQL account|
+|mqNamesrvAddr     |false   |           |RocketMQ name server address (e.g.,127.0.0.1:9876)|
+|mqTopic           |false   |           |RocketMQ topic name|
+|startType         |true    |NEW_EVENT  |The way that the replicator starts processing data,there are three options available:<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- SPECIFIED:starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null|
+|binlogFilename    |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this binlog file|
+|nextPosition      |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this position|
+|maxTransactionRows|true    |100        |max rows of the transaction pushed to RocketMQ|
+=======
 The RocketMQ-MySQL is a data replicator between MySQL and other system,the replicator parse the binglog event and send it in json format to the RocketMQ,and other system can pull data from RocketMQ.
 ## Dataflow
 ![dataflow](./doc/dataflow.png)
@@ -12,4 +51,4 @@ The RocketMQ-MySQL is a data replicator between MySQL and other system,the repli
 * 2.Send a binlog dump request to the MySQL;
 * 3.The MySQL push binglog event to the Replicator,the Replicator parse the data and accumulate as a transaction-object;
 * 4.Add the next-position of the transaction to the transaction-object and send it in json format to the queue;
-* 5.Record the binglog position and the offset in the queue of the latest transaction every second.
\ No newline at end of file
+* 5.Record the binglog position and the offset in the queue of the latest transaction every second.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/pom.xml b/rocketmq-mysql/pom.xml
new file mode 100644
index 0000000..8603bdd
--- /dev/null
+++ b/rocketmq-mysql/pom.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache</groupId>
+    <artifactId>rocketmq-mysql-replicator</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+
+    <scm>
+        <url>https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</url>
+        <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git</connection>
+        <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals.git
+        </developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+    <mailingLists>
+        <mailingList>
+            <name>Development List</name>
+            <subscribe>dev-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>dev-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>dev@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>User List</name>
+            <subscribe>users-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>users-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>users@rocketmq.incubator.apache.org</post>
+        </mailingList>
+        <mailingList>
+            <name>Commits List</name>
+            <subscribe>commits-subscribe@rocketmq.incubator.apache.org</subscribe>
+            <unsubscribe>commits-unsubscribe@rocketmq.incubator.apache.org</unsubscribe>
+            <post>commits@rocketmq.incubator.apache.org</post>
+        </mailingList>
+    </mailingLists>
+
+    <developers>
+        <developer>
+            <id>Apache RocketMQ</id>
+            <name>Apache RocketMQ of ASF</name>
+            <url>https://rocketmq.apache.org/</url>
+        </developer>
+    </developers>
+
+    <licenses>
+        <license>
+            <name>Apache License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <organization>
+        <name>Apache Software Foundation</name>
+        <url>http://www.apache.org</url>
+    </organization>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <maven.compiler.source>1.7</maven.compiler.source>
+        <maven.compiler.target>1.7</maven.compiler.target>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.zendesk</groupId>
+            <artifactId>open-replicator</artifactId>
+            <version>1.6.0</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.39</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.29</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.9</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>rocketmq-mysql</finalName>
+        <sourceDirectory>${project.basedir}/src/main/java</sourceDirectory>
+        <outputDirectory>${project.basedir}/target/classes</outputDirectory>
+        <resources>
+            <resource>
+                <directory>${project.basedir}/src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.8</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-enforcer-plugin</artifactId>
+                <version>1.4.1</version>
+                <executions>
+                    <execution>
+                        <id>enforce-ban-circular-dependencies</id>
+                        <goals>
+                            <goal>enforce</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <rules>
+                        <banCircularDependencies/>
+                    </rules>
+                    <fail>true</fail>
+                </configuration>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.codehaus.mojo</groupId>
+                        <artifactId>extra-enforcer-rules</artifactId>
+                        <version>1.0-beta-6</version>
+                    </dependency>
+                </dependencies>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>attach-javadocs</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <archive>
+                        <addMavenDescriptor>false</addMavenDescriptor>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathPrefix>lib/</classpathPrefix>
+                        </manifest>
+                    </archive>
+                    <excludes>
+                        <exclude>rocketmq_mysql.conf</exclude>
+                        <exclude>logback.xml</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/assembly.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/assembly.xml b/rocketmq-mysql/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..b280aa6
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/assembly.xml
@@ -0,0 +1,61 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly
+	xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+	<id>pack</id>
+	<formats>
+		<format>tar.gz</format>
+		<format>dir</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<dependencySets>
+		<dependencySet>
+			<useProjectArtifact>true</useProjectArtifact>
+			<outputDirectory>lib</outputDirectory>
+		</dependencySet>
+	</dependencySets>
+	<fileSets>
+		<fileSet>
+			<directory>src/main/assembly/scripts</directory>
+			<outputDirectory>bin</outputDirectory>
+			<fileMode>0755</fileMode>
+		</fileSet>
+		<fileSet>
+			<directory>target/classes</directory>
+			<outputDirectory>conf</outputDirectory>
+			<fileMode>0755</fileMode>
+			<includes>
+				<include>*.conf</include>
+				<include>logback.xml</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+
+	<files>
+		<file>
+			<source>LICENSE-BIN</source>
+			<destName>LICENSE</destName>
+		</file>
+		<file>
+			<source>NOTICE-BIN</source>
+			<destName>NOTICE</destName>
+		</file>
+	</files>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/assembly/scripts/start.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/start.sh b/rocketmq-mysql/src/main/assembly/scripts/start.sh
new file mode 100644
index 0000000..e159f36
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/start.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+
+binPath=$(cd "$(dirname "$0")"; pwd);
+cd $binPath
+cd ..
+parentPath=`pwd`
+libPath=$parentPath/lib/
+
+
+function exportClassPath(){
+    jarFileList=`find "$libPath" -name *.jar |awk -F'/' '{print $(NF)}' 2>>/dev/null`
+    CLASSPATH=".:$binPath";
+    for jarItem in $jarFileList
+      do
+        CLASSPATH="$CLASSPATH:$libPath$jarItem"
+    done
+    CLASSPATH=$CLASSPATH:./conf
+    export CLASSPATH
+}
+ulimit -n 65535
+exportClassPath
+
+java -server -Xms512m -Xmx512m -Xss2m -XX:NewRatio=2  -XX:+UseGCOverheadLimit -XX:-UseParallelGC -XX:ParallelGCThreads=24 org.apache.rocketmq.mysql.Replicator

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/assembly/scripts/stop.sh
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/assembly/scripts/stop.sh b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
new file mode 100755
index 0000000..f0e3c0d
--- /dev/null
+++ b/rocketmq-mysql/src/main/assembly/scripts/stop.sh
@@ -0,0 +1,18 @@
+#!/bin/bash
+
+PROGRAM_NAME="org.apache.rocketmq.mysql.Replicator"
+PIDS=`ps -ef | grep $PROGRAM_NAME | grep -v "grep" | awk '{print $2}'`
+
+if [ -z $PIDS ]; then
+    echo "No this process."
+else
+    echo "Find process is $PIDS."
+fi
+
+#####kill####
+echo -e "Stopping the $PROGRAM_NAME...\c"
+for PID in $PIDS ; do
+    kill  $PID
+done
+
+echo "SUCCESS!"

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
new file mode 100644
index 0000000..0705946
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Config.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Method;
+import java.util.Properties;
+
+
+public class Config {
+
+    public String mysqlAddr;
+    public Integer mysqlPort;
+    public String mysqlUsername;
+    public String mysqlPassword;
+
+    public String mqNamesrvAddr;
+    public String mqTopic;
+
+    public String startType = "NEW_EVENT";
+    public String binlogFilename;
+    public Long nextPosition;
+    public Integer maxTransactionRows = 100;
+
+    public void load() throws IOException {
+
+        InputStream in = Config.class.getClassLoader().getResourceAsStream("rocketmq_mysql.conf");
+        Properties properties = new Properties();
+        properties.load(in);
+
+        properties2Object(properties, this);
+
+    }
+
+    private void properties2Object(final Properties p, final Object object) {
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getProperty(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg = null;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public void setMysqlAddr(String mysqlAddr) {
+        this.mysqlAddr = mysqlAddr;
+    }
+
+    public void setMysqlPort(Integer mysqlPort) {
+        this.mysqlPort = mysqlPort;
+    }
+
+    public void setMysqlUsername(String mysqlUsername) {
+        this.mysqlUsername = mysqlUsername;
+    }
+
+    public void setMysqlPassword(String mysqlPassword) {
+        this.mysqlPassword = mysqlPassword;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public void setNextPosition(Long nextPosition) {
+        this.nextPosition = nextPosition;
+    }
+
+    public void setMaxTransactionRows(Integer maxTransactionRows) {
+        this.maxTransactionRows = maxTransactionRows;
+    }
+
+    public void setMqNamesrvAddr(String mqNamesrvAddr) {
+        this.mqNamesrvAddr = mqNamesrvAddr;
+    }
+
+    public void setMqTopic(String mqTopic) {
+        this.mqTopic = mqTopic;
+    }
+
+    public void setStartType(String startType) {
+        this.startType = startType;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
new file mode 100644
index 0000000..b358567
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/Replicator.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql;
+
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.binlog.Transaction;
+import org.apache.rocketmq.mysql.offset.OffsetLogThread;
+import org.apache.rocketmq.mysql.productor.RocketMQProducer;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Replicator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Replicator.class);
+
+    private static final Logger OFFSET_LOGGER = LoggerFactory.getLogger("OffsetLogger");
+
+    private Config config;
+
+    private EventProcessor eventProcessor;
+
+    private RocketMQProducer rocketMQProducer;
+
+    private Object lock = new Object();
+    private BinlogPosition nextBinlogPosition;
+    private long nextQueueOffset;
+    private long xid;
+
+    public static void main(String[] args) {
+
+        Replicator replicator = new Replicator();
+        replicator.start();
+    }
+
+    public void start() {
+
+        try {
+            config = new Config();
+            config.load();
+
+            rocketMQProducer = new RocketMQProducer(config);
+            rocketMQProducer.start();
+
+            OffsetLogThread offsetLogThread = new OffsetLogThread(this);
+            offsetLogThread.start();
+
+            eventProcessor = new EventProcessor(this);
+            eventProcessor.start();
+
+        } catch (Exception e) {
+            LOGGER.error("Start error.", e);
+            System.exit(1);
+        }
+    }
+
+    public void commit(Transaction transaction, boolean isComplete) {
+
+        String json = transaction.toJson();
+
+        for (int i = 0; i < 3; i++) {
+            try {
+                if (isComplete) {
+                    long offset = rocketMQProducer.push(json);
+
+                    synchronized (lock) {
+                        xid = transaction.getXid();
+                        nextBinlogPosition = transaction.getNextBinlogPosition();
+                        nextQueueOffset = offset;
+                    }
+
+                } else {
+                    rocketMQProducer.push(json);
+                }
+                break;
+
+            } catch (Exception e) {
+                LOGGER.error("Push error,retry:" + (i + 1) + ",", e);
+            }
+        }
+    }
+
+    public void logOffset() {
+
+        String binlogFilename = null;
+        long xid = 0L;
+        long nextPosition = 0L;
+        long nextOffset = 0L;
+
+        synchronized (lock) {
+            if (nextBinlogPosition != null) {
+                xid = this.xid;
+                binlogFilename = nextBinlogPosition.getBinlogFilename();
+                nextPosition = nextBinlogPosition.getPosition();
+                nextOffset = nextQueueOffset;
+            }
+        }
+
+        if (binlogFilename != null) {
+            OFFSET_LOGGER.info("XID: {},   BINLOG_FILE: {},   NEXT_POSITION: {},   NEXT_OFFSET: {}",
+                xid, binlogFilename, nextPosition, nextOffset);
+        }
+
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
new file mode 100644
index 0000000..3d9789f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/DataRow.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.google.code.or.common.glossary.Column;
+import com.google.code.or.common.glossary.Row;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataRow {
+
+    private Logger logger = LoggerFactory.getLogger(DataRow.class);
+
+    private String type;
+    private Table table;
+    private Row row;
+
+    public DataRow(String type, Table table, Row row) {
+        this.type = type;
+        this.table = table;
+        this.row = row;
+    }
+
+    public Map toMap() {
+
+        try {
+            if (table.getColList().size() == row.getColumns().size()) {
+                Map<String, Object> dataMap = new HashMap<>();
+                List<String> keyList = table.getColList();
+                List<ColumnParser> parserList = table.getParserList();
+                List<Column> valueList = row.getColumns();
+
+                for (int i = 0; i < keyList.size(); i++) {
+                    Object value = valueList.get(i).getValue();
+                    ColumnParser parser = parserList.get(i);
+                    dataMap.put(keyList.get(i), parser.getValue(value));
+                }
+
+                Map<String, Object> map = new HashMap<>();
+                map.put("database", table.getDatabase());
+                map.put("table", table.getName());
+                map.put("type", type);
+                map.put("data", dataMap);
+
+                return map;
+            } else {
+                logger.error("Table schema changed,discard data: {} - {}, {}  {}",
+                    table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString());
+
+                return null;
+            }
+        } catch (Exception e) {
+            logger.error("Row parse error,discard data: {} - {}, {}  {}",
+                table.getDatabase().toUpperCase(), table.getName().toUpperCase(), type, row.toString());
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
new file mode 100644
index 0000000..ecc632e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventListener.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.google.code.or.binlog.BinlogEventListener;
+import com.google.code.or.binlog.BinlogEventV4;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class EventListener implements BinlogEventListener {
+
+    private BlockingQueue<BinlogEventV4> queue;
+
+    public EventListener(BlockingQueue<BinlogEventV4> queue) {
+        this.queue = queue;
+    }
+
+    @Override
+    public void onEvents(BinlogEventV4 event) {
+
+        try {
+            while (true) {
+                if (queue.offer(event, 100, TimeUnit.MILLISECONDS)) {
+                    return;
+                }
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
new file mode 100644
index 0000000..f937b6d
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.druid.pool.DruidDataSourceFactory;
+import com.google.code.or.OpenReplicator;
+import com.google.code.or.binlog.BinlogEventV4;
+import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
+import com.google.code.or.binlog.impl.event.DeleteRowsEventV2;
+import com.google.code.or.binlog.impl.event.QueryEvent;
+import com.google.code.or.binlog.impl.event.TableMapEvent;
+import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
+import com.google.code.or.binlog.impl.event.UpdateRowsEventV2;
+import com.google.code.or.binlog.impl.event.WriteRowsEvent;
+import com.google.code.or.binlog.impl.event.WriteRowsEventV2;
+import com.google.code.or.binlog.impl.event.XidEvent;
+import com.google.code.or.common.glossary.Pair;
+import com.google.code.or.common.glossary.Row;
+import com.google.code.or.common.util.MySQLConstants;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Pattern;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.Replicator;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.position.BinlogPositionManager;
+import org.apache.rocketmq.mysql.schema.Schema;
+import org.apache.rocketmq.mysql.schema.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventProcessor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private Replicator replicator;
+    private Config config;
+
+    private DataSource dataSource;
+
+    private BinlogPositionManager binlogPositionManager;
+
+    private BlockingQueue<BinlogEventV4> queue = new LinkedBlockingQueue<>(100);
+
+    private OpenReplicator openReplicator;
+
+    private EventListener eventListener;
+
+    private Schema schema;
+
+    private Map<Long, Table> tableMap = new HashMap<>();
+
+    private Transaction transaction;
+
+    public EventProcessor(Replicator replicator) {
+
+        this.replicator = replicator;
+        this.config = replicator.getConfig();
+    }
+
+    public void start() throws Exception {
+
+        initDataSource();
+
+        binlogPositionManager = new BinlogPositionManager(config, dataSource);
+        binlogPositionManager.initBeginPosition();
+
+        schema = new Schema(dataSource);
+        schema.load();
+
+        eventListener = new EventListener(queue);
+        openReplicator = new OpenReplicator();
+        openReplicator.setBinlogEventListener(eventListener);
+        openReplicator.setHost(config.mysqlAddr);
+        openReplicator.setPort(config.mysqlPort);
+        openReplicator.setUser(config.mysqlUsername);
+        openReplicator.setPassword(config.mysqlPassword);
+        openReplicator.setStopOnEOF(false);
+        openReplicator.setHeartbeatPeriod(1f);
+        openReplicator.setLevel2BufferSize(50 * 1024 * 1024);
+        openReplicator.setServerId(1001);
+        openReplicator.setBinlogFileName(binlogPositionManager.getBinlogFilename());
+        openReplicator.setBinlogPosition(binlogPositionManager.getPosition());
+        openReplicator.start();
+
+        LOGGER.info("Started.");
+
+        doProcess();
+    }
+
+    private void doProcess() {
+
+        while (true) {
+
+            try {
+                BinlogEventV4 event = queue.take();
+
+                switch (event.getHeader().getEventType()) {
+                    case MySQLConstants.TABLE_MAP_EVENT:
+                        processTableMapEvent(event);
+                        break;
+
+                    case MySQLConstants.WRITE_ROWS_EVENT:
+                        processWriteEvent(event);
+                        break;
+
+                    case MySQLConstants.WRITE_ROWS_EVENT_V2:
+                        processWriteEventV2(event);
+                        break;
+
+                    case MySQLConstants.UPDATE_ROWS_EVENT:
+                        processUpdateEvent(event);
+                        break;
+
+                    case MySQLConstants.UPDATE_ROWS_EVENT_V2:
+                        processUpdateEventV2(event);
+                        break;
+
+                    case MySQLConstants.DELETE_ROWS_EVENT:
+                        processDeleteEvent(event);
+                        break;
+
+                    case MySQLConstants.DELETE_ROWS_EVENT_V2:
+                        processDeleteEventV2(event);
+                        break;
+
+                    case MySQLConstants.QUERY_EVENT:
+                        processQueryEvent(event);
+                        break;
+
+                    case MySQLConstants.XID_EVENT:
+                        processXidEvent(event);
+                        break;
+
+                }
+            } catch (Exception e) {
+                LOGGER.error("Binlog process error.", e);
+            }
+
+        }
+    }
+
+    private void processTableMapEvent(BinlogEventV4 event) {
+
+        TableMapEvent tableMapEvent = (TableMapEvent) event;
+        String dbName = tableMapEvent.getDatabaseName().toString();
+        String tableName = tableMapEvent.getTableName().toString();
+        Long tableId = tableMapEvent.getTableId();
+
+        Table table = schema.getTable(dbName, tableName);
+
+        tableMap.put(tableId, table);
+    }
+
+    private void processWriteEvent(BinlogEventV4 event) {
+
+        WriteRowsEvent writeRowsEvent = (WriteRowsEvent) event;
+        Long tableId = writeRowsEvent.getTableId();
+        List<Row> list = writeRowsEvent.getRows();
+
+        for (Row row : list) {
+            addRow("WRITE", tableId, row);
+        }
+    }
+
+    private void processWriteEventV2(BinlogEventV4 event) {
+
+        WriteRowsEventV2 writeRowsEventV2 = (WriteRowsEventV2) event;
+        Long tableId = writeRowsEventV2.getTableId();
+        List<Row> list = writeRowsEventV2.getRows();
+
+        for (Row row : list) {
+            addRow("WRITE", tableId, row);
+        }
+
+    }
+
+    private void processUpdateEvent(BinlogEventV4 event) {
+
+        UpdateRowsEvent updateRowsEvent = (UpdateRowsEvent) event;
+        Long tableId = updateRowsEvent.getTableId();
+        List<Pair<Row>> list = updateRowsEvent.getRows();
+
+        for (Pair<Row> pair : list) {
+            addRow("UPDATE", tableId, pair.getAfter());
+        }
+    }
+
+    private void processUpdateEventV2(BinlogEventV4 event) {
+
+        UpdateRowsEventV2 updateRowsEventV2 = (UpdateRowsEventV2) event;
+        Long tableId = updateRowsEventV2.getTableId();
+        List<Pair<Row>> list = updateRowsEventV2.getRows();
+
+        for (Pair<Row> pair : list) {
+            addRow("UPDATE", tableId, pair.getAfter());
+        }
+    }
+
+    private void processDeleteEvent(BinlogEventV4 event) {
+
+        DeleteRowsEvent deleteRowsEvent = (DeleteRowsEvent) event;
+        Long tableId = deleteRowsEvent.getTableId();
+        List<Row> list = deleteRowsEvent.getRows();
+
+        for (Row row : list) {
+            addRow("DELETE", tableId, row);
+        }
+
+    }
+
+    private void processDeleteEventV2(BinlogEventV4 event) {
+
+        DeleteRowsEventV2 deleteRowsEventV2 = (DeleteRowsEventV2) event;
+        Long tableId = deleteRowsEventV2.getTableId();
+        List<Row> list = deleteRowsEventV2.getRows();
+
+        for (Row row : list) {
+            addRow("DELETE", tableId, row);
+        }
+
+    }
+
+    private static Pattern createTablePattern =
+        Pattern.compile("^(CREATE|ALTER)\\s+TABLE", Pattern.CASE_INSENSITIVE);
+
+    private void processQueryEvent(BinlogEventV4 event) {
+
+        QueryEvent queryEvent = (QueryEvent) event;
+        String sql = queryEvent.getSql().toString();
+
+        if (createTablePattern.matcher(sql).find()) {
+            schema.reset();
+        }
+    }
+
+    private void processXidEvent(BinlogEventV4 event) {
+
+        XidEvent xidEvent = (XidEvent) event;
+        String binlogFilename = xidEvent.getBinlogFilename();
+        Long position = xidEvent.getHeader().getNextPosition();
+        Long xid = xidEvent.getXid();
+
+        BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);
+        transaction.setNextBinlogPosition(binlogPosition);
+        transaction.setXid(xid);
+
+        replicator.commit(transaction, true);
+
+        transaction = new Transaction(this);
+
+    }
+
+    private void addRow(String type, Long tableId, Row row) {
+
+        if (transaction == null) {
+            transaction = new Transaction(this);
+        }
+
+        Table t = tableMap.get(tableId);
+        if (t != null) {
+
+            while (true) {
+                if (transaction.addRow(type, t, row)) {
+                    break;
+
+                } else {
+                    transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
+                    replicator.commit(transaction, false);
+                    transaction = new Transaction(this);
+                }
+            }
+
+        }
+    }
+
+    private void initDataSource() throws Exception {
+
+        Map map = new HashMap();
+        map.put("driverClassName", "com.mysql.jdbc.Driver");
+        map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + "?useSSL=true&verifyServerCertificate=false");
+        map.put("username", config.mysqlUsername);
+        map.put("password", config.mysqlPassword);
+        map.put("initialSize", "2");
+        map.put("maxActive", "2");
+        map.put("maxWait", "60000");
+        map.put("timeBetweenEvictionRunsMillis", "60000");
+        map.put("minEvictableIdleTimeMillis", "300000");
+        map.put("validationQuery", "SELECT 1 FROM DUAL");
+        map.put("testWhileIdle", "true");
+
+        dataSource = DruidDataSourceFactory.createDataSource(map);
+    }
+
+    public Config getConfig() {
+        return config;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
new file mode 100644
index 0000000..16aa06f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/Transaction.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.binlog;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.code.or.common.glossary.Row;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.mysql.position.BinlogPosition;
+import org.apache.rocketmq.mysql.schema.Table;
+
+public class Transaction {
+
+    private BinlogPosition nextBinlogPosition;
+    private Long xid;
+
+    private EventProcessor eventProcessor;
+    private Config config;
+
+    private List<DataRow> list = new LinkedList<>();
+
+    public Transaction(EventProcessor eventProcessor) {
+        this.eventProcessor = eventProcessor;
+        this.config = eventProcessor.getConfig();
+    }
+
+    public boolean addRow(String type, Table table, Row row) {
+
+        if (list.size() == config.maxTransactionRows) {
+            return false;
+        } else {
+            DataRow dataRow = new DataRow(type, table, row);
+            list.add(dataRow);
+            return true;
+        }
+
+    }
+
+    public String toJson() {
+
+        List<Map> rows = new LinkedList<>();
+        for (DataRow dataRow : list) {
+            Map rowMap = dataRow.toMap();
+            if (rowMap != null) {
+                rows.add(rowMap);
+            }
+        }
+
+        Map<String, Object> map = new HashMap<>();
+        map.put("xid", xid);
+        map.put("binlogFilename", nextBinlogPosition.getBinlogFilename());
+        map.put("nextPosition", nextBinlogPosition.getPosition());
+        map.put("rows", rows);
+
+        return JSONObject.toJSONString(map);
+    }
+
+    public BinlogPosition getNextBinlogPosition() {
+        return nextBinlogPosition;
+    }
+
+    public void setNextBinlogPosition(BinlogPosition nextBinlogPosition) {
+        this.nextBinlogPosition = nextBinlogPosition;
+    }
+
+    public void setXid(Long xid) {
+        this.xid = xid;
+    }
+
+    public Long getXid() {
+        return xid;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
new file mode 100644
index 0000000..40468af
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/offset/OffsetLogThread.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.offset;
+
+import org.apache.rocketmq.mysql.Replicator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OffsetLogThread extends Thread {
+
+    private Logger logger = LoggerFactory.getLogger(OffsetLogThread.class);
+
+    private Replicator replicator;
+
+    public OffsetLogThread(Replicator replicator) {
+        this.replicator = replicator;
+        setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+
+        while (true) {
+
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                logger.error("Offset thread interrupted.", e);
+            }
+
+            replicator.logOffset();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
new file mode 100644
index 0000000..5ba436c
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPosition.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+public class BinlogPosition {
+
+    private String binlogFilename;
+    private Long position;
+
+    public BinlogPosition(String binlogFilename, Long position) {
+        this.binlogFilename = binlogFilename;
+        this.position = position;
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public void setBinlogFilename(String binlogFilename) {
+        this.binlogFilename = binlogFilename;
+    }
+
+    public Long getPosition() {
+        return position;
+    }
+
+    public void setPosition(Long position) {
+        this.position = position;
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
new file mode 100644
index 0000000..67e8d9e
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/position/BinlogPositionManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.position;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.mysql.Config;
+
+public class BinlogPositionManager {
+
+    private DataSource dataSource;
+    private Config config;
+
+    private String binlogFilename;
+    private Long nextPosition;
+
+    public BinlogPositionManager(Config config, DataSource dataSource) {
+        this.config = config;
+        this.dataSource = dataSource;
+    }
+
+    public void initBeginPosition() throws Exception {
+
+        if (config.startType == null || config.startType.equals("NEW_EVENT")) {
+
+            initPositionFromBinlogTail();
+        } else if (config.startType.equals("LAST_PROCESSED")) {
+
+            initPositionFromMqTail();
+        } else if (config.startType.equals("SPECIFIED")) {
+
+            binlogFilename = config.binlogFilename;
+            nextPosition = config.nextPosition;
+        }
+
+        if (binlogFilename == null || nextPosition == null) {
+            throw new Exception("binlogFilename | nextPosition is null.");
+        }
+    }
+
+    private void initPositionFromMqTail() throws Exception {
+
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("BINLOG_CONSUMER_GROUP");
+        consumer.setNamesrvAddr(config.mqNamesrvAddr);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.start();
+
+        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(config.mqTopic);
+        MessageQueue queue = queues.iterator().next();
+
+        if (queue != null) {
+            Long offset = consumer.maxOffset(queue);
+            if (offset > 0)
+                offset--;
+
+            PullResult pullResult = consumer.pull(queue, "*", offset, 100);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                MessageExt msg = pullResult.getMsgFoundList().get(0);
+                String json = new String(msg.getBody(), "UTF-8");
+
+                JSONObject js = JSON.parseObject(json);
+                binlogFilename = (String) js.get("binlogFilename");
+                nextPosition = js.getLong("nextPosition");
+            }
+        }
+
+    }
+
+    private void initPositionFromBinlogTail() throws SQLException {
+
+        String sql = "SHOW MASTER STATUS";
+
+        Connection conn = null;
+        ResultSet rs = null;
+
+        try {
+            Connection connection = dataSource.getConnection();
+            rs = connection.createStatement().executeQuery(sql);
+
+            while (rs.next()) {
+                binlogFilename = rs.getString("File");
+                nextPosition = rs.getLong("Position");
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public Long getPosition() {
+        return nextPosition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
new file mode 100644
index 0000000..fb4eb11
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/productor/RocketMQProducer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.productor;
+
+import java.io.UnsupportedEncodingException;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.mysql.Config;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocketMQProducer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQProducer.class);
+
+    private DefaultMQProducer producer;
+    private Config config;
+
+    public RocketMQProducer(Config config) {
+        this.config = config;
+    }
+
+    public void start() throws MQClientException {
+
+        producer = new DefaultMQProducer("BINLOG_PRODUCER_GROUP");
+        producer.setNamesrvAddr(config.mqNamesrvAddr);
+        producer.start();
+    }
+
+    public long push(
+        String json) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
+
+        LOGGER.debug(json);
+
+        Message message = new Message(config.mqTopic, json.getBytes("UTF-8"));
+        SendResult sendResult = producer.send(message);
+
+        return sendResult.getQueueOffset();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/5593575c/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
new file mode 100644
index 0000000..604cd7f
--- /dev/null
+++ b/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/schema/Database.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.mysql.schema;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.rocketmq.mysql.binlog.EventProcessor;
+import org.apache.rocketmq.mysql.schema.column.ColumnParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Database {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
+
+    private String name;
+
+    private DataSource dataSource;
+
+    private Map<String, Table> tableMap = new HashMap<String, Table>();
+
+    public Database(String name, DataSource dataSource) {
+        this.name = name;
+        this.dataSource = dataSource;
+    }
+
+    public void init() throws SQLException {
+
+        String sql = "select table_name,column_name,data_type,column_type,character_set_name " +
+            "from information_schema.columns " +
+            "where table_schema = ?";
+
+        Connection conn = null;
+        PreparedStatement ps = null;
+        ResultSet rs = null;
+
+        try {
+            conn = dataSource.getConnection();
+
+            ps = conn.prepareStatement(sql);
+            ps.setString(1, name);
+            rs = ps.executeQuery();
+
+            while (rs.next()) {
+                String tableName = rs.getString(1);
+                String colName = rs.getString(2);
+                String dataType = rs.getString(3);
+                String colType = rs.getString(4);
+                String charset = rs.getString(5);
+
+                ColumnParser columnParser = ColumnParser.getColumnParser(dataType, colType, charset);
+
+                if (!tableMap.containsKey(tableName)) {
+                    addTable(tableName);
+                }
+                Table table = tableMap.get(tableName);
+                table.addCol(colName);
+                table.addParser(columnParser);
+            }
+
+        } finally {
+
+            if (conn != null) {
+                conn.close();
+            }
+            if (ps != null) {
+                ps.close();
+            }
+            if (rs != null) {
+                rs.close();
+            }
+        }
+
+    }
+
+    private void addTable(String tableName) {
+
+        LOGGER.info("Schema load -- DATABASE:{},\tTABLE:{}", name, tableName);
+
+        Table table = new Table(name, tableName);
+        tableMap.put(tableName, table);
+    }
+
+    public Table getTable(String tableName) {
+
+        return tableMap.get(tableName);
+    }
+}


[6/6] incubator-rocketmq-externals git commit: Polish the README file

Posted by vo...@apache.org.
Polish the README file


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/54020df2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/54020df2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/54020df2

Branch: refs/heads/master
Commit: 54020df25a64359a86ffdd078e1d62d108ebc82a
Parents: 39d89ea
Author: vongosling <vo...@apache.org>
Authored: Thu Jun 15 17:50:19 2017 +0800
Committer: vongosling <vo...@apache.org>
Committed: Thu Jun 15 17:50:19 2017 +0800

----------------------------------------------------------------------
 rocketmq-mysql/README.md | 28 +++++++++++++---------------
 1 file changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/54020df2/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
index 92686fd..2b8a63e 100644
--- a/rocketmq-mysql/README.md
+++ b/rocketmq-mysql/README.md
@@ -4,29 +4,27 @@
 ## Overview
 ![overview](./doc/overview.png)
 
-The RocketMQ-MySQL is a data replicator between MySQL and other systems, 
-the replicator simulates a MySQL slave instance and parses the binlog event 
-and sends it to the RocketMQ in json format ,other systems can consume data from RocketMQ. 
-With the RocketMQ-MySQL Replicator,more systems can process data from MySQL binlog 
-in a simple and low cost method.
+The RocketMQ-MySQL is a data replicator between MySQL and other systems. The replicator simulates a MySQL slave instance, parses the binlog event 
+and sends it to RocketMQ in json format. Besides MySQL, other systems can also consume data from RocketMQ. With the RocketMQ-MySQL Replicator, more systems can easily process data from MySQL binlog at a very low cost.
 
 ## Dataflow
 ![dataflow](./doc/dataflow.png)
 
-* 1.Firstly,get the last data from the queue,and get the binlog position from this data,and if the queue data is null,then use the latest binlog position of the MySQL,and surely user can also specify this position on his own;
-* 2.Send a binlog dump request to the MySQL;
-* 3.The MySQL push binlog event to the replicator,the replicator parses the data and accumulate as a transaction-object;
-* 4.Add the next-position of the transaction to the transaction-object and send it in json format to the queue;
-* 5.Record the binlog position and the offset in the queue of the latest transaction every second.
+* 1. Firstly, get the last data from the queue, and get the binlog position from this
+ data. If the data queue is empty, use the latest binlog position of MySQL. Besides that, customized setting of position of the wanted binlog is also supported.
+* 2. Send a binlog dump request to MySQL.
+* 3. MySQL pushes binlog event to the replicator. The replicator parses the data and accumulates it as a transaction-object.
+* 4. Add the next-position of the transaction to the transaction-object and send it in json format to the queue.
+* 5. Record the binlog position and the offset in the queue of the latest transaction every second.
 
 
 ## Quick Start
 
-* 1.Create an account with MySQL replication permission,which is used to simulate the MySQL slave to get the binlog event,and the replication must be in row mode;
-* 2.Create a topic in the RocketMQ to store binlog events,in order to ensure that the downstream system consumes the data orderly,the topic must have only one queue;
-* 3.Configure relevant information of MySQL and RocketMQ in the RocketMQ-MySQL.conf file;
-* 4.Execute"mvn install",and then start the replicator(execute "nohup ./start.sh &");
-* 5.Subscribe to and process the messages in your system.
+* 1. Create an account with MySQL replication permission, which is used to simulate the MySQL slave to get the binlog event, and the replication must be in row mode.
+* 2. Create a topic in the RocketMQ to store binlog events to ensure that the downstream system consumes the data in order. Make sure the topic must have only one queue.
+* 3. Configure the relevant information of MySQL and RocketMQ in the RocketMQ-MySQL.conf file.
+* 4. Execute "mvn install", then start the replicator (via execute "nohup ./start.sh &").
+* 5. Subscribe to and process the messages in your system.
 
 
 ## Configuration Instruction


[3/6] incubator-rocketmq-externals git commit: Remove useless project.RocketMQ Storm integration can be found in the Storm repo.

Posted by vo...@apache.org.
Remove useless project.RocketMQ Storm integration can be found in the Storm repo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/9eaec194
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/9eaec194
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/9eaec194

Branch: refs/heads/master
Commit: 9eaec19410400516deff8f7718202c204b6bfd98
Parents: 5593575
Author: vongosling <vo...@alibaba-inc.com>
Authored: Wed Jun 14 20:53:49 2017 +0800
Committer: vongosling <vo...@alibaba-inc.com>
Committed: Wed Jun 14 20:53:49 2017 +0800

----------------------------------------------------------------------
 rocketmq-storm/LICENSE   | 201 ------------------------------------------
 rocketmq-storm/NOTICE    |   5 --
 rocketmq-storm/README.md |   2 -
 3 files changed, 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9eaec194/rocketmq-storm/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-storm/LICENSE b/rocketmq-storm/LICENSE
deleted file mode 100644
index b67d909..0000000
--- a/rocketmq-storm/LICENSE
+++ /dev/null
@@ -1,201 +0,0 @@
-Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "{}"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright {}
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9eaec194/rocketmq-storm/NOTICE
----------------------------------------------------------------------
diff --git a/rocketmq-storm/NOTICE b/rocketmq-storm/NOTICE
deleted file mode 100644
index 5384857..0000000
--- a/rocketmq-storm/NOTICE
+++ /dev/null
@@ -1,5 +0,0 @@
-Apache RocketMQ (incubating)
-Copyright 2016-2017 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/9eaec194/rocketmq-storm/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-storm/README.md b/rocketmq-storm/README.md
deleted file mode 100644
index cdd23d8..0000000
--- a/rocketmq-storm/README.md
+++ /dev/null
@@ -1,2 +0,0 @@
-# RocketMQ Storm Integration
-


[4/6] incubator-rocketmq-externals git commit: Remove conflict contents

Posted by vo...@apache.org.
Remove conflict contents


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/340ef4e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/340ef4e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/340ef4e8

Branch: refs/heads/master
Commit: 340ef4e8e00272441c5552614ee453e2052a8d9a
Parents: 9eaec19
Author: vongosling <vo...@alibaba-inc.com>
Authored: Thu Jun 15 17:25:56 2017 +0800
Committer: vongosling <vo...@alibaba-inc.com>
Committed: Thu Jun 15 17:25:56 2017 +0800

----------------------------------------------------------------------
 rocketmq-mysql/README.md | 12 +-----------
 1 file changed, 1 insertion(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/340ef4e8/rocketmq-mysql/README.md
----------------------------------------------------------------------
diff --git a/rocketmq-mysql/README.md b/rocketmq-mysql/README.md
index cd3b54c..92686fd 100644
--- a/rocketmq-mysql/README.md
+++ b/rocketmq-mysql/README.md
@@ -41,14 +41,4 @@ in a simple and low cost method.
 |startType         |true    |NEW_EVENT  |The way that the replicator starts processing data,there are three options available:<br>- NEW_EVENT: starts processing data from the tail of binlog<br>- LAST_PROCESSED: starts processing data from the last processed event<br>- SPECIFIED:starts processing data from the position that user specified,if you choose this option,the binlogFilename and nextPosition must not be null|
 |binlogFilename    |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this binlog file|
 |nextPosition      |true    |           |If "startType" is "SPECIFIED",the replicator will begin to replicate from this position|
-|maxTransactionRows|true    |100        |max rows of the transaction pushed to RocketMQ|
-=======
-The RocketMQ-MySQL is a data replicator between MySQL and other system,the replicator parse the binglog event and send it in json format to the RocketMQ,and other system can pull data from RocketMQ.
-## Dataflow
-![dataflow](./doc/dataflow.png)
-
-* 1.Firstly,get the last data from the queue,and get the binglog position from this data,and if the queue data is null,then use the latest binglog position of the MySQL,and surely user can also specify this position on his own;
-* 2.Send a binlog dump request to the MySQL;
-* 3.The MySQL push binglog event to the Replicator,the Replicator parse the data and accumulate as a transaction-object;
-* 4.Add the next-position of the transaction to the transaction-object and send it in json format to the queue;
-* 5.Record the binglog position and the offset in the queue of the latest transaction every second.
+|maxTransactionRows|true    |100        |max rows of the transaction pushed to RocketMQ|
\ No newline at end of file


[5/6] incubator-rocketmq-externals git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals

Posted by vo...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/commit/39d89eae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/tree/39d89eae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/diff/39d89eae

Branch: refs/heads/master
Commit: 39d89eae57404e9de6d7260227e534a1c8bf6ea2
Parents: 340ef4e 90be366
Author: vongosling <vo...@apache.org>
Authored: Thu Jun 15 17:46:22 2017 +0800
Committer: vongosling <vo...@apache.org>
Committed: Thu Jun 15 17:46:22 2017 +0800

----------------------------------------------------------------------

----------------------------------------------------------------------