You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/02 09:14:30 UTC

[pulsar] branch master updated: Optimization of binlog to pulsar use canal (#3268)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bbd736d  Optimization of binlog to pulsar use canal (#3268)
bbd736d is described below

commit bbd736ddc2bd38b2df3c667b286b6f15cd31dcff
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed Jan 2 17:14:24 2019 +0800

    Optimization of binlog to pulsar use canal (#3268)
    
    ### Motivation
    
    Optimization of mysql binlog to pulsar use canal
    
    ### Modifications
    
    Add MessageUtils for parse columns
    Support query of presto sql
    
    ### Result
    
    #### INSERT
    mysql:
    ```
    MariaDB [aaa]> insert into users321(name, extra) values('xxxddxxxmm', 'ddd');
    ```
    python consumer:
    ```
    {u'timestamp': u'2018-12-31 00:31:24', u'message': u'[{"data":null,"database":"","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"insert into users321(name, extra) values(\'xxxddxxxmm\', \'ddd\')","sqlType":null,"table":"","ts":1546234284669,"type":"QUERY"},{"data":[{"isKey":"1","isNull":"0","index":"0","mysqlType":"int(11)","columnName":"id","columnValue":"34","updated":"1"},{"isKey":"0","isNull":"0","index":"1","mysqlType":"varchar(50)","columnName":"name" [...]
    ```
    presto sql:
    ```
    presto> select id, timestamp, message from pulsar."public/default".my_topic_test where id=58;
     id |      timestamp      |
    ----+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
     58 | 2018-12-31 00:31:24 | [{"data":null,"database":"","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"insert into users321(name, extra) values('xxxddxxxmm', 'ddd')","sqlType"
    ```
---
 .../{CanalSource.java => CanalAbstractSource.java} |  47 +++---
 .../apache/pulsar/io/canal/CanalByteSource.java    |  51 +++++++
 .../apache/pulsar/io/canal/CanalStringSource.java  |  75 ++++++++++
 .../org/apache/pulsar/io/canal/MessageUtils.java   | 161 +++++++++++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |   2 +-
 5 files changed, 309 insertions(+), 27 deletions(-)

diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
similarity index 81%
rename from pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
rename to pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index c348964..c1bcb3d 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -18,20 +18,16 @@
  */
 package org.apache.pulsar.io.canal;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
 import org.slf4j.MDC;
 
 import java.net.InetSocketAddress;
@@ -40,16 +36,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
+
 /**
- * A Simple class for mysql binlog sync to pulsar.
+ * A Simple abstract class for mysql binlog sync to pulsar.
  */
-@Connector(
-    name = "canal",
-    type = IOType.SOURCE,
-    help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
-    configClass = CanalSourceConfig.class)
 @Slf4j
-public class CanalSource extends PushSource<byte[]> {
+public abstract class CanalAbstractSource<V> extends PushSource<V> {
 
     protected Thread thread = null;
 
@@ -131,10 +123,9 @@ public class CanalSource extends PushSource<byte[]> {
                 connector.subscribe();
                 while (running) {
                     Message message = connector.getWithoutAck(canalSourceConfig.getBatchSize());
-                    // delete the setRaw in new version of canal-client
                     message.setRaw(false);
-                    List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
-                    long batchId = message.getId();
+                    List<FlatMessage> flatMessages = MessageUtils.messageConverter(message);
+                    long batchId = getMessageId(message);
                     int size = message.getEntries().size();
                     if (batchId == -1 || size == 0) {
                         try {
@@ -143,10 +134,9 @@ public class CanalSource extends PushSource<byte[]> {
                         }
                     } else {
                         if (flatMessages != null) {
-                            CanalRecord canalRecord = new CanalRecord(connector);
-                            String m = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
+                            CanalRecord<V> canalRecord = new CanalRecord<>(connector);
                             canalRecord.setId(batchId);
-                            canalRecord.setRecord(m.getBytes());
+                            canalRecord.setRecord(extractValue(flatMessages));
                             consume(canalRecord);
                         }
                     }
@@ -160,11 +150,15 @@ public class CanalSource extends PushSource<byte[]> {
         }
     }
 
+    public abstract Long getMessageId(Message message);
+
+    public abstract V extractValue(List<FlatMessage> flatMessages);
+
     @Getter
     @Setter
-    static private class CanalRecord implements Record<byte[]> {
+    static private class CanalRecord<V> implements Record<V> {
 
-        private byte[] record;
+        private V record;
         private Long id;
         private CanalConnector connector;
 
@@ -173,17 +167,19 @@ public class CanalSource extends PushSource<byte[]> {
         }
 
         @Override
-        public Optional<String>  getKey() {
-            return  Optional.of(Long.toString(id));
+        public Optional<String> getKey() {
+            return Optional.of(Long.toString(id));
         }
 
         @Override
-        public byte[] getValue() {
+        public V getValue() {
             return record;
         }
 
         @Override
-        public Optional<Long> getRecordSequence() {return Optional.of(id);}
+        public Optional<Long> getRecordSequence() {
+            return Optional.of(id);
+        }
 
         @Override
         public void ack() {
@@ -192,5 +188,4 @@ public class CanalSource extends PushSource<byte[]> {
         }
 
     }
-
-}
+}
\ No newline at end of file
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalByteSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalByteSource.java
new file mode 100644
index 0000000..9e14715
--- /dev/null
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalByteSource.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.canal;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.util.List;
+
+/**
+ * A Simple class for mysql binlog sync to pulsar.
+ */
+@Connector(
+    name = "canal",
+    type = IOType.SOURCE,
+    help = "The CanalByteSource is used for syncing mysql binlog to Pulsar.",
+    configClass = CanalSourceConfig.class)
+public class CanalByteSource extends CanalAbstractSource<byte[]> {
+
+    @Override
+    public Long getMessageId(Message message) {
+        return message.getId();
+    }
+
+    @Override
+    public byte[] extractValue(List<FlatMessage> flatMessages) {
+        String messages = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
+        return messages.getBytes();
+    }
+
+}
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalStringSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalStringSource.java
new file mode 100644
index 0000000..2f9bb5f
--- /dev/null
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalStringSource.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.canal;
+
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import com.alibaba.fastjson.JSON;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+/**
+ * A Simple class for mysql binlog sync to pulsar.
+ */
+@Connector(
+        name = "canal",
+        type = IOType.SOURCE,
+        help = "The CanalStringSource is used for syncing mysql binlog to Pulsar, easy to use presto sql search.",
+        configClass = CanalSourceConfig.class)
+public class CanalStringSource extends CanalAbstractSource<CanalMessage> {
+
+    private Long messageId;
+
+    @Override
+    public Long getMessageId(Message message) {
+        this.messageId = message.getId();
+        return this.messageId;
+    }
+
+    @Override
+    public CanalMessage extractValue(List<FlatMessage> flatMessages) {
+        String messages = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
+        CanalMessage canalMessage = new CanalMessage();
+        Date date = new Date();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        canalMessage.setTimestamp(dateFormat.format(date));
+        canalMessage.setId(this.messageId);
+        canalMessage.setMessage(messages);
+        return canalMessage;
+    }
+
+}
+
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+class CanalMessage {
+    private Long id;
+    private String message;
+    private String timestamp;
+}
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
new file mode 100644
index 0000000..10586ce
--- /dev/null
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.canal;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+
+/**
+ * A Simple class for mysql binlog message to parse.
+ */
+public class MessageUtils {
+
+    public static Map<String, String> genColumn(CanalEntry.Column column) {
+        Map<String, String> row = new LinkedHashMap<>();
+        if (column.getIsKey()) {
+            row.put("isKey", "1");
+        } else {
+            row.put("isKey", "0");
+        }
+        if (column.getIsNull()) {
+            row.put("isNull", "1");
+        } else {
+            row.put("isNull", "0");
+        }
+        row.put("index", Integer.toString(column.getIndex()));
+        row.put("mysqlType", column.getMysqlType());
+        row.put("columnName", column.getName());
+        if (column.getIsNull()) {
+            row.put("columnValue", null);
+        } else {
+            row.put("columnValue", column.getValue());
+        }
+        return row;
+    }
+
+    /**
+     * Message convert to FlatMessage
+     *
+     * @param message
+     * @return FlatMessage List
+     */
+    public static List<FlatMessage> messageConverter(Message message) {
+        try {
+            if (message == null) {
+                return null;
+            }
+
+            List<FlatMessage> flatMessages = new ArrayList<>();
+            List<CanalEntry.Entry> entrys = null;
+            if (message.isRaw()) {
+                List<ByteString> rawEntries = message.getRawEntries();
+                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+                for (ByteString byteString : rawEntries) {
+                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+                    entrys.add(entry);
+                }
+            } else {
+                entrys = message.getEntries();
+            }
+
+            for (CanalEntry.Entry entry : entrys) {
+                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                    continue;
+                }
+
+                CanalEntry.RowChange rowChange;
+                try {
+                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+                } catch (Exception e) {
+                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+                            + entry.toString(), e);
+                }
+
+                CanalEntry.EventType eventType = rowChange.getEventType();
+
+                FlatMessage flatMessage = new FlatMessage(message.getId());
+                flatMessages.add(flatMessage);
+                flatMessage.setDatabase(entry.getHeader().getSchemaName());
+                flatMessage.setTable(entry.getHeader().getTableName());
+                flatMessage.setIsDdl(rowChange.getIsDdl());
+                flatMessage.setType(eventType.toString());
+                flatMessage.setEs(entry.getHeader().getExecuteTime());
+                flatMessage.setTs(System.currentTimeMillis());
+                flatMessage.setSql(rowChange.getSql());
+
+                if (!rowChange.getIsDdl()) {
+                    List<Map<String, String>> data = new ArrayList<>();
+                    List<Map<String, String>> old = new ArrayList<>();
+
+                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+                                && eventType != CanalEntry.EventType.DELETE) {
+                            continue;
+                        }
+
+                        List<CanalEntry.Column> columns;
+
+                        if (eventType == CanalEntry.EventType.DELETE) {
+                            columns = rowData.getBeforeColumnsList();
+                        } else {
+                            columns = rowData.getAfterColumnsList();
+                        }
+                        columns.size();
+                        for (CanalEntry.Column column : columns) {
+                            Map<String, String> row = genColumn(column);
+                            if (column.getUpdated()) {
+                                row.put("updated", "1");
+                            } else {
+                                row.put("updated", "0");
+                            }
+                            data.add(row);
+                        }
+
+                        if (eventType == CanalEntry.EventType.UPDATE) {
+                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+                                Map<String, String> rowOld = genColumn(column);
+                                old.add(rowOld);
+                            }
+                        }
+                    }
+
+                    if (!data.isEmpty()) {
+                        flatMessage.setData(data);
+                    }
+                    if (!old.isEmpty()) {
+                        flatMessage.setOld(old);
+                    }
+                }
+            }
+            return flatMessages;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
index fa48f39..2ecf4df 100644
--- a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,4 @@
 
 name: canal
 description: canal source and read data from mysql
-sourceClass: org.apache.pulsar.io.canal.CanalSource
+sourceClass: org.apache.pulsar.io.canal.CanalStringSource