You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/02 09:14:30 UTC
[pulsar] branch master updated: Optimization of binlog to pulsar
use canal (#3268)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bbd736d Optimization of binlog to pulsar use canal (#3268)
bbd736d is described below
commit bbd736ddc2bd38b2df3c667b286b6f15cd31dcff
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed Jan 2 17:14:24 2019 +0800
Optimization of binlog to pulsar use canal (#3268)
### Motivation
Optimization of mysql binlog to pulsar use canal
### Modifications
Add MessageUtils for parse columns
Support query of presto sql
### Result
#### INSERT
mysql:
```
MariaDB [aaa]> insert into users321(name, extra) values('xxxddxxxmm', 'ddd');
```
python consumer:
```
{u'timestamp': u'2018-12-31 00:31:24', u'message': u'[{"data":null,"database":"","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"insert into users321(name, extra) values(\'xxxddxxxmm\', \'ddd\')","sqlType":null,"table":"","ts":1546234284669,"type":"QUERY"},{"data":[{"isKey":"1","isNull":"0","index":"0","mysqlType":"int(11)","columnName":"id","columnValue":"34","updated":"1"},{"isKey":"0","isNull":"0","index":"1","mysqlType":"varchar(50)","columnName":"name" [...]
```
presto sql:
```
presto> select id, timestamp, message from pulsar."public/default".my_topic_test where id=58;
id | timestamp |
----+---------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
58 | 2018-12-31 00:31:24 | [{"data":null,"database":"","es":1546234283000,"id":58,"isDdl":false,"mysqlType":null,"old":null,"sql":"insert into users321(name, extra) values('xxxddxxxmm', 'ddd')","sqlType"
```
---
.../{CanalSource.java => CanalAbstractSource.java} | 47 +++---
.../apache/pulsar/io/canal/CanalByteSource.java | 51 +++++++
.../apache/pulsar/io/canal/CanalStringSource.java | 75 ++++++++++
.../org/apache/pulsar/io/canal/MessageUtils.java | 161 +++++++++++++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 2 +-
5 files changed, 309 insertions(+), 27 deletions(-)
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
similarity index 81%
rename from pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
rename to pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
index c348964..c1bcb3d 100644
--- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSource.java
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java
@@ -18,20 +18,16 @@
*/
package org.apache.pulsar.io.canal;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
-import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
import org.slf4j.MDC;
import java.net.InetSocketAddress;
@@ -40,16 +36,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+
/**
- * A Simple class for mysql binlog sync to pulsar.
+ * A Simple abstract class for mysql binlog sync to pulsar.
*/
-@Connector(
- name = "canal",
- type = IOType.SOURCE,
- help = "The CanalSource is used for syncing mysql binlog to Pulsar.",
- configClass = CanalSourceConfig.class)
@Slf4j
-public class CanalSource extends PushSource<byte[]> {
+public abstract class CanalAbstractSource<V> extends PushSource<V> {
protected Thread thread = null;
@@ -131,10 +123,9 @@ public class CanalSource extends PushSource<byte[]> {
connector.subscribe();
while (running) {
Message message = connector.getWithoutAck(canalSourceConfig.getBatchSize());
- // delete the setRaw in new version of canal-client
message.setRaw(false);
- List<FlatMessage> flatMessages = FlatMessage.messageConverter(message);
- long batchId = message.getId();
+ List<FlatMessage> flatMessages = MessageUtils.messageConverter(message);
+ long batchId = getMessageId(message);
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
@@ -143,10 +134,9 @@ public class CanalSource extends PushSource<byte[]> {
}
} else {
if (flatMessages != null) {
- CanalRecord canalRecord = new CanalRecord(connector);
- String m = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
+ CanalRecord<V> canalRecord = new CanalRecord<>(connector);
canalRecord.setId(batchId);
- canalRecord.setRecord(m.getBytes());
+ canalRecord.setRecord(extractValue(flatMessages));
consume(canalRecord);
}
}
@@ -160,11 +150,15 @@ public class CanalSource extends PushSource<byte[]> {
}
}
+ public abstract Long getMessageId(Message message);
+
+ public abstract V extractValue(List<FlatMessage> flatMessages);
+
@Getter
@Setter
- static private class CanalRecord implements Record<byte[]> {
+ static private class CanalRecord<V> implements Record<V> {
- private byte[] record;
+ private V record;
private Long id;
private CanalConnector connector;
@@ -173,17 +167,19 @@ public class CanalSource extends PushSource<byte[]> {
}
@Override
- public Optional<String> getKey() {
- return Optional.of(Long.toString(id));
+ public Optional<String> getKey() {
+ return Optional.of(Long.toString(id));
}
@Override
- public byte[] getValue() {
+ public V getValue() {
return record;
}
@Override
- public Optional<Long> getRecordSequence() {return Optional.of(id);}
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(id);
+ }
@Override
public void ack() {
@@ -192,5 +188,4 @@ public class CanalSource extends PushSource<byte[]> {
}
}
-
-}
+}
\ No newline at end of file
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalByteSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalByteSource.java
new file mode 100644
index 0000000..9e14715
--- /dev/null
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalByteSource.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.canal;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.util.List;
+
+/**
+ * A Simple class for mysql binlog sync to pulsar.
+ */
+@Connector(
+ name = "canal",
+ type = IOType.SOURCE,
+ help = "The CanalByteSource is used for syncing mysql binlog to Pulsar.",
+ configClass = CanalSourceConfig.class)
+public class CanalByteSource extends CanalAbstractSource<byte[]> {
+
+ @Override
+ public Long getMessageId(Message message) {
+ return message.getId();
+ }
+
+ @Override
+ public byte[] extractValue(List<FlatMessage> flatMessages) {
+ String messages = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
+ return messages.getBytes();
+ }
+
+}
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalStringSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalStringSource.java
new file mode 100644
index 0000000..2f9bb5f
--- /dev/null
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalStringSource.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.canal;
+
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import com.alibaba.fastjson.JSON;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+/**
+ * A Simple class for mysql binlog sync to pulsar.
+ */
+@Connector(
+ name = "canal",
+ type = IOType.SOURCE,
+ help = "The CanalStringSource is used for syncing mysql binlog to Pulsar, easy to use presto sql search.",
+ configClass = CanalSourceConfig.class)
+public class CanalStringSource extends CanalAbstractSource<CanalMessage> {
+
+ private Long messageId;
+
+ @Override
+ public Long getMessageId(Message message) {
+ this.messageId = message.getId();
+ return this.messageId;
+ }
+
+ @Override
+ public CanalMessage extractValue(List<FlatMessage> flatMessages) {
+ String messages = JSON.toJSONString(flatMessages, SerializerFeature.WriteMapNullValue);
+ CanalMessage canalMessage = new CanalMessage();
+ Date date = new Date();
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ canalMessage.setTimestamp(dateFormat.format(date));
+ canalMessage.setId(this.messageId);
+ canalMessage.setMessage(messages);
+ return canalMessage;
+ }
+
+}
+
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+class CanalMessage {
+ private Long id;
+ private String message;
+ private String timestamp;
+}
diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
new file mode 100644
index 0000000..10586ce
--- /dev/null
+++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.canal;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.FlatMessage;
+import com.alibaba.otter.canal.protocol.Message;
+import com.google.protobuf.ByteString;
+
+/**
+ * A Simple class for mysql binlog message to parse.
+ */
+public class MessageUtils {
+
+ public static Map<String, String> genColumn(CanalEntry.Column column) {
+ Map<String, String> row = new LinkedHashMap<>();
+ if (column.getIsKey()) {
+ row.put("isKey", "1");
+ } else {
+ row.put("isKey", "0");
+ }
+ if (column.getIsNull()) {
+ row.put("isNull", "1");
+ } else {
+ row.put("isNull", "0");
+ }
+ row.put("index", Integer.toString(column.getIndex()));
+ row.put("mysqlType", column.getMysqlType());
+ row.put("columnName", column.getName());
+ if (column.getIsNull()) {
+ row.put("columnValue", null);
+ } else {
+ row.put("columnValue", column.getValue());
+ }
+ return row;
+ }
+
+ /**
+ * Message convert to FlatMessage
+ *
+ * @param message
+ * @return FlatMessage List
+ */
+ public static List<FlatMessage> messageConverter(Message message) {
+ try {
+ if (message == null) {
+ return null;
+ }
+
+ List<FlatMessage> flatMessages = new ArrayList<>();
+ List<CanalEntry.Entry> entrys = null;
+ if (message.isRaw()) {
+ List<ByteString> rawEntries = message.getRawEntries();
+ entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
+ for (ByteString byteString : rawEntries) {
+ CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
+ entrys.add(entry);
+ }
+ } else {
+ entrys = message.getEntries();
+ }
+
+ for (CanalEntry.Entry entry : entrys) {
+ if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
+ || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+ continue;
+ }
+
+ CanalEntry.RowChange rowChange;
+ try {
+ rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+ } catch (Exception e) {
+ throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ + entry.toString(), e);
+ }
+
+ CanalEntry.EventType eventType = rowChange.getEventType();
+
+ FlatMessage flatMessage = new FlatMessage(message.getId());
+ flatMessages.add(flatMessage);
+ flatMessage.setDatabase(entry.getHeader().getSchemaName());
+ flatMessage.setTable(entry.getHeader().getTableName());
+ flatMessage.setIsDdl(rowChange.getIsDdl());
+ flatMessage.setType(eventType.toString());
+ flatMessage.setEs(entry.getHeader().getExecuteTime());
+ flatMessage.setTs(System.currentTimeMillis());
+ flatMessage.setSql(rowChange.getSql());
+
+ if (!rowChange.getIsDdl()) {
+ List<Map<String, String>> data = new ArrayList<>();
+ List<Map<String, String>> old = new ArrayList<>();
+
+ for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+ if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
+ && eventType != CanalEntry.EventType.DELETE) {
+ continue;
+ }
+
+ List<CanalEntry.Column> columns;
+
+ if (eventType == CanalEntry.EventType.DELETE) {
+ columns = rowData.getBeforeColumnsList();
+ } else {
+ columns = rowData.getAfterColumnsList();
+ }
+ columns.size();
+ for (CanalEntry.Column column : columns) {
+ Map<String, String> row = genColumn(column);
+ if (column.getUpdated()) {
+ row.put("updated", "1");
+ } else {
+ row.put("updated", "0");
+ }
+ data.add(row);
+ }
+
+ if (eventType == CanalEntry.EventType.UPDATE) {
+ for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+ Map<String, String> rowOld = genColumn(column);
+ old.add(rowOld);
+ }
+ }
+ }
+
+ if (!data.isEmpty()) {
+ flatMessage.setData(data);
+ }
+ if (!old.isEmpty()) {
+ flatMessage.setOld(old);
+ }
+ }
+ }
+ return flatMessages;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
index fa48f39..2ecf4df 100644
--- a/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/canal/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,4 @@
name: canal
description: canal source and read data from mysql
-sourceClass: org.apache.pulsar.io.canal.CanalSource
+sourceClass: org.apache.pulsar.io.canal.CanalStringSource