You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hugegraph.apache.org by GitBox <gi...@apache.org> on 2022/06/06 07:50:24 UTC

[GitHub] [incubator-hugegraph-toolchain] simon824 opened a new pull request, #291: feat: Introduce HugeGraphFlinkCDCLoader

simon824 opened a new pull request, #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291

   closed #290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r890733268


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }

Review Comment:
   I think it's a good idea, let's do it the next pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
simon824 commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r890703900


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }

Review Comment:
   About test , I plan to introduce `testcontainers` for doing e2e test, starting 3 containers for mysql, flink, hugegraph, for testing the correctness of the entire process (similar for sparkLoader). What do you think?
   
   This may take some time, I will submit later or the next pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] codecov[bot] commented on pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#issuecomment-1147158392

   # [Codecov](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#291](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (80b20e9) into [master](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/commit/9ee7abb2ee397db636e4c1a1f258f0c7bcd1b5c1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9ee7abb) will **decrease** coverage by `3.31%`.
   > The diff coverage is `0.52%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #291      +/-   ##
   ============================================
   - Coverage     71.12%   67.80%   -3.32%     
     Complexity      877      877              
   ============================================
     Files            82       86       +4     
     Lines          3816     4004     +188     
     Branches        456      476      +20     
   ============================================
   + Hits           2714     2715       +1     
   - Misses          898     1085     +187     
     Partials        204      204              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...com/baidu/hugegraph/loader/constant/Constants.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL2NvbnN0YW50L0NvbnN0YW50cy5qYXZh) | `75.00% <ø> (ø)` | |
   | [...gegraph/loader/flink/HugeGraphDeserialization.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL2ZsaW5rL0h1Z2VHcmFwaERlc2VyaWFsaXphdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ugegraph/loader/flink/HugeGraphFlinkCDCLoader.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL2ZsaW5rL0h1Z2VHcmFwaEZsaW5rQ0RDTG9hZGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [.../hugegraph/loader/flink/HugeGraphOutputFormat.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL2ZsaW5rL0h1Z2VHcmFwaE91dHB1dEZvcm1hdC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../hugegraph/loader/flink/HugeGraphSinkFunction.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL2ZsaW5rL0h1Z2VHcmFwaFNpbmtGdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...u/hugegraph/loader/spark/HugeGraphSparkLoader.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL3NwYXJrL0h1Z2VHcmFwaFNwYXJrTG9hZGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...m/baidu/hugegraph/loader/executor/LoadOptions.java](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aHVnZWdyYXBoLWxvYWRlci9zcmMvbWFpbi9qYXZhL2NvbS9iYWlkdS9odWdlZ3JhcGgvbG9hZGVyL2V4ZWN1dG9yL0xvYWRPcHRpb25zLmphdmE=) | `74.71% <100.00%> (+0.29%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [9ee7abb...80b20e9](https://codecov.io/gh/apache/incubator-hugegraph-toolchain/pull/291?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r902259937


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -93,7 +93,7 @@ private MySqlSource<String> buildMysqlSource(InputStruct struct) {
             port = uriBuilder.getPort();
         } catch (URISyntaxException e) {
             throw new IllegalArgumentException(
-                    String.format("Failed to parse Url(%s) to get hostName and port", url), e);
+                    String.format("Failed to parse url(%s) to get hostName and port", url), e);

Review Comment:
   tiny: we can use hostname



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r890817478


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphDeserialization.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphDeserialization implements DebeziumDeserializationSchema<String> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphDeserialization.class);
+
+    @Override
+    public void deserialize(SourceRecord sourceRecord,
+                            Collector<String> collector) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode result = mapper.createObjectNode();
+
+        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
+        String op = operation.code();
+        Struct value = (Struct) sourceRecord.value();
+        Struct data;
+        switch (operation) {
+            case DELETE:
+                data = value.getStruct("before");
+                break;
+            case CREATE:
+            case READ:
+            case UPDATE:
+                data = value.getStruct("after");
+                break;
+            default:
+                throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
+        }
+        ObjectNode rootNode = mapper.createObjectNode();
+        if (data != null) {
+            Schema afterSchema = data.schema();
+            List<Field> afterFields = afterSchema.fields();
+            for (Field field : afterFields) {
+                Object afterValue = data.get(field);
+                rootNode.put(field.name(), afterValue.toString());
+            }
+        }
+
+        result.set(Constants.CDC_DATA, rootNode);
+        result.put(Constants.CDC_OP, op);
+        LOG.debug("Loaded data: {}", result.toString());
+        collector.collect(result.toString());
+

Review Comment:
   blank line



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);

Review Comment:
   need to stop if too many errors occur?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {

Review Comment:
   can we add a private method for better readability 



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String url = input.url();
+            String host;
+            int port;
+            try {
+                URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
+                host = uriBuilder.getHost();
+                port = uriBuilder.getPort();
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(

Review Comment:
   throw IllegaArgumentException here?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphSinkFunction.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import javax.annotation.Nonnull;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+public class HugeGraphSinkFunction<T> extends RichSinkFunction<T>
+                                      implements CheckpointedFunction {
+
+    private static final long serialVersionUID = -2259171589402599426L;
+    private final HugeGraphOutputFormat<Object> outputFormat;
+
+    public HugeGraphSinkFunction(@Nonnull HugeGraphOutputFormat<Object> outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);

Review Comment:
   this.outputFormat



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphDeserialization.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphDeserialization implements DebeziumDeserializationSchema<String> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphDeserialization.class);
+
+    @Override
+    public void deserialize(SourceRecord sourceRecord,
+                            Collector<String> collector) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode result = mapper.createObjectNode();
+
+        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
+        String op = operation.code();
+        Struct value = (Struct) sourceRecord.value();
+        Struct data;
+        switch (operation) {
+            case DELETE:
+                data = value.getStruct("before");
+                break;
+            case CREATE:
+            case READ:
+            case UPDATE:
+                data = value.getStruct("after");
+                break;
+            default:
+                throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String url = input.url();
+            String host;
+            int port;
+            try {
+                URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
+                host = uriBuilder.getHost();
+                port = uriBuilder.getPort();
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(
+                        String.format("Failed to parse Url(%s) to get hostName and port",
+                                      url), e);
+            }
+
+            MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
+                    .hostname(host)
+                    .port(port)
+                    .databaseList(input.database())
+                    .tableList(input.database() + "." + input.table())
+                    .username(input.username())
+                    .password(input.password())
+                    .deserializer(new HugeGraphDeserialization())
+                    .build();

Review Comment:
   prefer to add a method buildMysqlSource() and move line 69~90 into it



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java:
##########
@@ -76,4 +76,9 @@ public final class Constants {
     public static final int VERTEX_ID_LIMIT = 128;
     public static final String[] SEARCH_LIST = new String[]{":", "!"};
     public static final String[] TARGET_LIST = new String[]{"`:", "`!"};
+
+    public static final String HOST_PORT_REGEX = ".+://(.+):(\\d+)";
+    public static final String CDC_DATA = "data";
+    public static final String CDC_OP = "op";
+

Review Comment:
   can remove the blank line



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);
+                        }
+                    }
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void flushAll() {
+        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+            List<String> graphElements = builder.getValue();
+            if (graphElements.size() > 0) {
+                flush(builder);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder);
+            }
+        }
+    }
+
+    private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
+        GraphManager g = loadContext.client().graph();
+        ElementBuilder elementBuilder = builder.getKey();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : builder.getValue()) {
+            JsonNode node;
+            try {
+                node = new ObjectMapper().readTree(row);
+            } catch (JsonProcessingException e) {
+                LOG.error("Failed to parse json {}", row, e);
+                continue;
+            }
+
+            JsonNode data = node.get(Constants.CDC_DATA);
+            String op = node.get(Constants.CDC_OP).asText();
+
+            String[] fields = struct.input().header();
+            String[] values = new String[data.size()];
+            for (int i = 0; i < fields.length; i++) {
+                values[i] = data.get(fields[i]).asText();
+            }
+            List<GraphElement> graphElements = builder.getKey().build(fields, values);
+            boolean isVertex = elementBuilder.mapping().type().isVertex();
+            switch (Envelope.Operation.forCode(op)) {
+                case READ:
+                case CREATE:
+                    if (isVertex) {
+                        g.addVertices((List<Vertex>) (Object) graphElements);
+                    } else {
+                        g.addEdges((List<Edge>) (Object) graphElements);
+                    }
+                    break;
+                case UPDATE:
+                    Map<String, UpdateStrategy> updateStrategyMap =
+                            elementMapping.updateStrategies();
+                    if (isVertex) {
+                        BatchVertexRequest.Builder req =
+                                new BatchVertexRequest.Builder();
+                        req.vertices((List<Vertex>) (Object) graphElements)
+                           .updatingStrategies(updateStrategyMap)
+                           .createIfNotExist(true);
+                        g.updateVertices(req.build());
+                    } else {
+                        BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
+                        req.edges((List<Edge>) (Object) graphElements)
+                           .updatingStrategies(updateStrategyMap)
+                           .checkVertex(this.loadOptions.checkVertex)
+                           .createIfNotExist(true);
+                        g.updateEdges(req.build());
+                    }
+                    break;
+                case DELETE:
+                    String id = graphElements.get(0).id().toString();
+                    if (isVertex) {
+                        g.removeVertex(id);
+                    } else {
+                        g.removeEdge(id);
+                    }
+                    break;
+                default:
+                    throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
+            }
+        }
+        builder.getValue().clear();
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!closed) {
+            closed = true;
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);

Review Comment:
   this.scheduledFuture



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);
+                        }
+                    }
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void flushAll() {
+        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+            List<String> graphElements = builder.getValue();
+            if (graphElements.size() > 0) {
+                flush(builder);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder);
+            }
+        }
+    }
+
+    private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
+        GraphManager g = loadContext.client().graph();
+        ElementBuilder elementBuilder = builder.getKey();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : builder.getValue()) {
+            JsonNode node;
+            try {
+                node = new ObjectMapper().readTree(row);
+            } catch (JsonProcessingException e) {
+                LOG.error("Failed to parse json {}", row, e);
+                continue;
+            }
+
+            JsonNode data = node.get(Constants.CDC_DATA);
+            String op = node.get(Constants.CDC_OP).asText();
+
+            String[] fields = struct.input().header();
+            String[] values = new String[data.size()];
+            for (int i = 0; i < fields.length; i++) {
+                values[i] = data.get(fields[i]).asText();
+            }
+            List<GraphElement> graphElements = builder.getKey().build(fields, values);
+            boolean isVertex = elementBuilder.mapping().type().isVertex();
+            switch (Envelope.Operation.forCode(op)) {
+                case READ:
+                case CREATE:
+                    if (isVertex) {
+                        g.addVertices((List<Vertex>) (Object) graphElements);
+                    } else {
+                        g.addEdges((List<Edge>) (Object) graphElements);
+                    }
+                    break;
+                case UPDATE:
+                    Map<String, UpdateStrategy> updateStrategyMap =
+                            elementMapping.updateStrategies();
+                    if (isVertex) {
+                        BatchVertexRequest.Builder req =
+                                new BatchVertexRequest.Builder();
+                        req.vertices((List<Vertex>) (Object) graphElements)
+                           .updatingStrategies(updateStrategyMap)
+                           .createIfNotExist(true);
+                        g.updateVertices(req.build());
+                    } else {
+                        BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
+                        req.edges((List<Edge>) (Object) graphElements)
+                           .updatingStrategies(updateStrategyMap)
+                           .checkVertex(this.loadOptions.checkVertex)
+                           .createIfNotExist(true);
+                        g.updateEdges(req.build());
+                    }
+                    break;
+                case DELETE:
+                    String id = graphElements.get(0).id().toString();
+                    if (isVertex) {
+                        g.removeVertex(id);
+                    } else {
+                        g.removeEdge(id);
+                    }
+                    break;
+                default:
+                    throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");

Review Comment:
   throw IllegaArgumentException here?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);
+                        }
+                    }
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void flushAll() {
+        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+            List<String> graphElements = builder.getValue();
+            if (graphElements.size() > 0) {
+                flush(builder);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder);
+            }
+        }
+    }
+
+    private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
+        GraphManager g = loadContext.client().graph();
+        ElementBuilder elementBuilder = builder.getKey();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : builder.getValue()) {
+            JsonNode node;
+            try {
+                node = new ObjectMapper().readTree(row);
+            } catch (JsonProcessingException e) {
+                LOG.error("Failed to parse json {}", row, e);

Review Comment:
   throw ParseException?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);
+                        }
+                    }
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void flushAll() {
+        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+            List<String> graphElements = builder.getValue();
+            if (graphElements.size() > 0) {
+                flush(builder);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder);
+            }
+        }
+    }
+
+    private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
+        GraphManager g = loadContext.client().graph();
+        ElementBuilder elementBuilder = builder.getKey();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : builder.getValue()) {
+            JsonNode node;
+            try {
+                node = new ObjectMapper().readTree(row);
+            } catch (JsonProcessingException e) {
+                LOG.error("Failed to parse json {}", row, e);
+                continue;
+            }
+
+            JsonNode data = node.get(Constants.CDC_DATA);
+            String op = node.get(Constants.CDC_OP).asText();
+
+            String[] fields = struct.input().header();
+            String[] values = new String[data.size()];
+            for (int i = 0; i < fields.length; i++) {
+                values[i] = data.get(fields[i]).asText();
+            }
+            List<GraphElement> graphElements = builder.getKey().build(fields, values);
+            boolean isVertex = elementBuilder.mapping().type().isVertex();
+            switch (Envelope.Operation.forCode(op)) {
+                case READ:
+                case CREATE:
+                    if (isVertex) {
+                        g.addVertices((List<Vertex>) (Object) graphElements);
+                    } else {
+                        g.addEdges((List<Edge>) (Object) graphElements);
+                    }
+                    break;
+                case UPDATE:
+                    Map<String, UpdateStrategy> updateStrategyMap =
+                            elementMapping.updateStrategies();
+                    if (isVertex) {
+                        BatchVertexRequest.Builder req =
+                                new BatchVertexRequest.Builder();
+                        req.vertices((List<Vertex>) (Object) graphElements)
+                           .updatingStrategies(updateStrategyMap)
+                           .createIfNotExist(true);
+                        g.updateVertices(req.build());
+                    } else {
+                        BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
+                        req.edges((List<Edge>) (Object) graphElements)
+                           .updatingStrategies(updateStrategyMap)
+                           .checkVertex(this.loadOptions.checkVertex)
+                           .createIfNotExist(true);
+                        g.updateEdges(req.build());
+                    }
+                    break;
+                case DELETE:
+                    String id = graphElements.get(0).id().toString();
+                    if (isVertex) {
+                        g.removeVertex(id);
+                    } else {
+                        g.removeEdge(id);
+                    }
+                    break;
+                default:
+                    throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
+            }
+        }
+        builder.getValue().clear();
+    }
+
+    @Override
+    public synchronized void close() {
+        if (!closed) {

Review Comment:
   expect `this.closed` style



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String url = input.url();
+            String host;
+            int port;
+            try {
+                URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
+                host = uriBuilder.getHost();
+                port = uriBuilder.getPort();
+            } catch (URISyntaxException e) {
+                throw new RuntimeException(
+                        String.format("Failed to parse Url(%s) to get hostName and port",
+                                      url), e);
+            }
+
+            MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
+                    .hostname(host)
+                    .port(port)
+                    .databaseList(input.database())
+                    .tableList(input.database() + "." + input.table())
+                    .username(input.username())
+                    .password(input.password())
+                    .deserializer(new HugeGraphDeserialization())
+                    .build();
+
+            DataStreamSource<String> source =
+                    env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+
+            HugeGraphOutputFormat<Object> format = new HugeGraphOutputFormat<>(struct, options);
+            source.addSink(new HugeGraphSinkFunction<>(format)).setParallelism(1);
+        }
+        env.enableCheckpointing(3000);
+        try {
+            env.execute("flink-cdc-hugegraph");
+        } catch (Exception e) {
+            Printer.printError("Failed to execute flink.", e);

Review Comment:
   throw LoadException?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);
+                        }
+                    }
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void flushAll() {
+        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+            List<String> graphElements = builder.getValue();
+            if (graphElements.size() > 0) {
+                flush(builder);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder);
+            }
+        }
+    }
+
+    private void flush(Map.Entry<ElementBuilder, List<String>> builder) {

Review Comment:
   can split into 2 params: `ElementBuilder builder` and `List<String> rows`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                    "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
+                synchronized (HugeGraphOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flushAll();
+                        } catch (Exception e) {
+                            LOG.error("Failed to flush all data.", e);
+                        }
+                    }
+                }
+            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void flushAll() {
+        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+            List<String> graphElements = builder.getValue();
+            if (graphElements.size() > 0) {
+                flush(builder);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder);
+            }
+        }
+    }
+
+    private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
+        GraphManager g = loadContext.client().graph();
+        ElementBuilder elementBuilder = builder.getKey();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : builder.getValue()) {
+            JsonNode node;
+            try {
+                node = new ObjectMapper().readTree(row);
+            } catch (JsonProcessingException e) {
+                LOG.error("Failed to parse json {}", row, e);
+                continue;
+            }
+
+            JsonNode data = node.get(Constants.CDC_DATA);
+            String op = node.get(Constants.CDC_OP).asText();
+
+            String[] fields = struct.input().header();
+            String[] values = new String[data.size()];
+            for (int i = 0; i < fields.length; i++) {
+                values[i] = data.get(fields[i]).asText();
+            }
+            List<GraphElement> graphElements = builder.getKey().build(fields, values);

Review Comment:
   add a method buildGraphData() for line 149~168?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java:
##########
@@ -76,4 +76,9 @@ public final class Constants {
     public static final int VERTEX_ID_LIMIT = 128;
     public static final String[] SEARCH_LIST = new String[]{":", "!"};
     public static final String[] TARGET_LIST = new String[]{"`:", "`!"};
+
+    public static final String HOST_PORT_REGEX = ".+://(.+):(\\d+)";

Review Comment:
   unused?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme merged pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme merged PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r889944921


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import static java.util.regex.Pattern.compile;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String host = "";
+            int port = -1;
+            Matcher m = compile(Constants.HOST_PORT_REGEX).matcher(input.url());

Review Comment:
   get it, or try URIBuilder: https://github.com/apache/incubator-hugegraph/blob/74c0e2dc8a724b23553037fdbd1fed05f4aac0b8/hugegraph-mysql/src/main/java/com/baidu/hugegraph/backend/store/mysql/MysqlSessions.java#L233



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
simon824 commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r890051246


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import static java.util.regex.Pattern.compile;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String host = "";
+            int port = -1;
+            Matcher m = compile(Constants.HOST_PORT_REGEX).matcher(input.url());

Review Comment:
   it works, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r890140132


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphDeserialization.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphDeserialization implements DebeziumDeserializationSchema<String> {

Review Comment:
   keep a blank line after class define



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }

Review Comment:
   > Added lines #L54 - L60 were not covered by tests
   
   can we add some tests for this code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
simon824 commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r889930865


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import static java.util.regex.Pattern.compile;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String host = "";
+            int port = -1;
+            Matcher m = compile(Constants.HOST_PORT_REGEX).matcher(input.url());

Review Comment:
   URL cannot parse url like this `jdbc:mysql://127.0.0.1:3306`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] seagle-yuan commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
seagle-yuan commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r895459203


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);

Review Comment:
   Can you change LoggerFactory to com.baidu.hugegraph.util.Log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] seagle-yuan commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
seagle-yuan commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r897922565


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -209,7 +208,7 @@ private void parse(Row row,
         graphElements.addAll(elements);
     }
 
-    private void sink(Map.Entry<ElementBuilder, List<GraphElement>> builderMap,
+    private void flush(Map.Entry<ElementBuilder, List<GraphElement>> builderMap,
                       GraphManager g, boolean isCheckVertex) {

Review Comment:
   ![image](https://user-images.githubusercontent.com/13429553/173827498-5bf7745c-bdd8-42b3-9915-c02512b7a752.png)
   can you align 'GraphManager' with 'Map'



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = Log.logger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(this.loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : this.struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, this.struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : this.struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, this.struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                                 "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
+                    this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private synchronized void flushAll() {
+        if (this.closed) {
+            return;
+        }
+        try {
+            for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+                List<String> graphElements = builder.getValue();
+                if (graphElements.size() > 0) {
+                    flush(builder.getKey(), graphElements);
+                }
+            }
+        } catch (Exception e) {
+            throw new LoadException("Failed to flush all data.", e);
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder.getKey(), builder.getValue());
+            }
+        }
+    }
+
+    private Tuple2<String, List<GraphElement>> buildGraphData(ElementBuilder elementBuilder,
+                                                              String row) {
+        JsonNode node;
+        try {
+            node = new ObjectMapper().readTree(row);
+        } catch (JsonProcessingException e) {
+            throw new ParseException(row, e);
+        }
+        JsonNode data = node.get(Constants.CDC_DATA);
+        String op = node.get(Constants.CDC_OP).asText();
+        String[] fields = this.struct.input().header();
+        String[] values = new String[data.size()];
+        for (int i = 0; i < fields.length; i++) {
+            values[i] = data.get(fields[i]).asText();
+        }
+        return Tuple2.of(op, elementBuilder.build(fields, values));
+    }
+
+    private void flush(ElementBuilder elementBuilder, List<String> rows) {
+        GraphManager g = this.loadContext.client().graph();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : rows) {
+            Tuple2<String, List<GraphElement>> graphData = buildGraphData(elementBuilder, row);
+            List<GraphElement> graphElements = graphData.f1;
+            boolean isVertex = elementBuilder.mapping().type().isVertex();
+            switch (Envelope.Operation.forCode(graphData.f0)) {
+                case READ:

Review Comment:
   https://checkstyle.sourceforge.io/config_coding.html#FallThrough
   can you update the code to comply with [this rule](https://checkstyle.sourceforge.io/config_coding.html#FallThrough)
   
   every "CASE" block must contains "break, return, throw or continue" statement.



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphDeserialization.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.util.Log;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphDeserialization implements DebeziumDeserializationSchema<String> {
+
+    private static final Logger LOG = Log.logger(HugeGraphDeserialization.class);
+
+    @Override
+    public void deserialize(SourceRecord sourceRecord,
+                            Collector<String> collector) throws Exception {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode result = mapper.createObjectNode();
+
+        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
+        String op = operation.code();
+        Struct value = (Struct) sourceRecord.value();
+        Struct data;
+        switch (operation) {
+            case DELETE:
+                data = value.getStruct("before");
+                break;
+            case CREATE:
+            case READ:
+            case UPDATE:

Review Comment:
   
   https://checkstyle.sourceforge.io/config_coding.html#FallThrough
   every case block must contain "break, return, throw or continue"
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r891871032


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -99,7 +78,31 @@ public void load() {
         try {
             env.execute("flink-cdc-hugegraph");
         } catch (Exception e) {
-            Printer.printError("Failed to execute flink.", e);
+            throw new LoadException("Failed to execute flink", e);
+        }
+    }
+
+    private MySqlSource<String> buildMysqlSource(InputStruct struct) {
+        JDBCSource input = (JDBCSource) struct.input();
+        String url = input.url();
+        String host;
+        int port;
+        try {
+            URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
+            host = uriBuilder.getHost();
+            port = uriBuilder.getPort();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(

Review Comment:
   can also throw IllegalArgumentException



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -99,7 +78,31 @@ public void load() {
         try {
             env.execute("flink-cdc-hugegraph");
         } catch (Exception e) {
-            Printer.printError("Failed to execute flink.", e);
+            throw new LoadException("Failed to execute flink", e);
+        }
+    }
+
+    private MySqlSource<String> buildMysqlSource(InputStruct struct) {
+        JDBCSource input = (JDBCSource) struct.input();
+        String url = input.url();
+        String host;
+        int port;
+        try {
+            URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
+            host = uriBuilder.getHost();
+            port = uriBuilder.getPort();
+        } catch (URISyntaxException e) {
+            throw new RuntimeException(
+                    String.format("Failed to parse Url(%s) to get hostName and port", url), e);
         }
+        return MySqlSource.<String>builder()
+                .hostname(host)

Review Comment:
   align with "."



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -102,26 +104,26 @@ public void open(int taskNumber, int numTasks) {
         int flushIntervalMs = this.loadOptions.flushIntervalMs;
         if (flushIntervalMs > 0) {
             this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
-                    "hugegraph-streamload-outputformat"));
-            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
-                synchronized (HugeGraphOutputFormat.this) {
-                    if (!closed) {
-                        try {
-                            flushAll();
-                        } catch (Exception e) {
-                            LOG.error("Failed to flush all data.", e);
-                        }
-                    }
-                }
-            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+                                 "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
+                    this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
         }
     }
 
     private void flushAll() {
-        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
-            List<String> graphElements = builder.getValue();
-            if (graphElements.size() > 0) {
-                flush(builder);
+        synchronized (HugeGraphOutputFormat.this) {

Review Comment:
   mark flushAll() as synchronized?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -204,18 +207,19 @@ private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
                     }
                     break;
                 default:
-                    throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
+                    throw new IllegalArgumentException(
+                              "The type of `op` should be 'c' 'r' 'u' 'd' only");
             }
         }
-        builder.getValue().clear();
+        rows.clear();
     }
 
     @Override
     public synchronized void close() {
-        if (!closed) {
-            closed = true;
+        if (!this.closed) {

Review Comment:
   return if this.closed



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -102,26 +104,26 @@ public void open(int taskNumber, int numTasks) {
         int flushIntervalMs = this.loadOptions.flushIntervalMs;
         if (flushIntervalMs > 0) {
             this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
-                    "hugegraph-streamload-outputformat"));
-            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
-                synchronized (HugeGraphOutputFormat.this) {
-                    if (!closed) {
-                        try {
-                            flushAll();
-                        } catch (Exception e) {
-                            LOG.error("Failed to flush all data.", e);
-                        }
-                    }
-                }
-            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+                                 "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
+                    this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
         }
     }
 
     private void flushAll() {
-        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
-            List<String> graphElements = builder.getValue();
-            if (graphElements.size() > 0) {
-                flush(builder);
+        synchronized (HugeGraphOutputFormat.this) {
+            if (!this.closed) {
+                try {
+                    for (Map.Entry<ElementBuilder, List<String>> builder :
+                         this.builders.entrySet()) {
+                        List<String> graphElements = builder.getValue();
+                        if (graphElements.size() > 0) {
+                            flush(builder.getKey(), graphElements);
+                        }
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to flush all data.", e);

Review Comment:
   prefer LoadException



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -102,26 +104,26 @@ public void open(int taskNumber, int numTasks) {
         int flushIntervalMs = this.loadOptions.flushIntervalMs;
         if (flushIntervalMs > 0) {
             this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
-                    "hugegraph-streamload-outputformat"));
-            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
-                synchronized (HugeGraphOutputFormat.this) {
-                    if (!closed) {
-                        try {
-                            flushAll();
-                        } catch (Exception e) {
-                            LOG.error("Failed to flush all data.", e);
-                        }
-                    }
-                }
-            }, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+                                 "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
+                    this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
         }
     }
 
     private void flushAll() {
-        for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
-            List<String> graphElements = builder.getValue();
-            if (graphElements.size() > 0) {
-                flush(builder);
+        synchronized (HugeGraphOutputFormat.this) {
+            if (!this.closed) {

Review Comment:
   return if this.closed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] imbajin commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
imbajin commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r896416882


##########
hugegraph-loader/assembly/static/bin/get-params.sh:
##########
@@ -0,0 +1,24 @@
+#!/bin/bash
+function get_params() {
+  echo "params: $*"
+  engine_params=""
+  hugegraph_params=""
+  while (("$#")); do
+    case "$1" in
+    -–file | --graph | --schema | --host | --port | --username | --token | --protocol | \
+      --trust-store-file | --trust-store-password | --clear-all-data | --clear-timeout | \

Review Comment:
   shall line8 align with line9?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java:
##########
@@ -209,7 +208,7 @@ private void parse(Row row,
         graphElements.addAll(elements);
     }
 
-    private void sink(Map.Entry<ElementBuilder, List<GraphElement>> builderMap,
+    private void flush(Map.Entry<ElementBuilder, List<GraphElement>> builderMap,
                       GraphManager g, boolean isCheckVertex) {

Review Comment:
   `GraphManger` aline with `MapX`?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+

Review Comment:
   seems extra line



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+
+            MySqlSource<String> mysqlSource = buildMysqlSource(struct);
+            DataStreamSource<String> source =
+                    env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+
+            HugeGraphOutputFormat<Object> format = new HugeGraphOutputFormat<>(struct, options);
+            source.addSink(new HugeGraphSinkFunction<>(format)).setParallelism(1);
+        }
+        env.enableCheckpointing(3000);
+        try {
+            env.execute("flink-cdc-hugegraph");
+        } catch (Exception e) {
+            throw new LoadException("Failed to execute flink", e);
+        }
+    }
+
+    private MySqlSource<String> buildMysqlSource(InputStruct struct) {
+        JDBCSource input = (JDBCSource) struct.input();
+        String url = input.url();
+        String host;
+        int port;
+        try {
+            URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
+            host = uriBuilder.getHost();
+            port = uriBuilder.getPort();
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(
+                    String.format("Failed to parse Url(%s) to get hostName and port", url), e);

Review Comment:
   `url` or `URL`?



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+    private static final String JDBC_PREFIX = "jdbc:";
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+
+            MySqlSource<String> mysqlSource = buildMysqlSource(struct);
+            DataStreamSource<String> source =
+                    env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+
+            HugeGraphOutputFormat<Object> format = new HugeGraphOutputFormat<>(struct, options);
+            source.addSink(new HugeGraphSinkFunction<>(format)).setParallelism(1);

Review Comment:
   why we set the parallelism to `1` for default
   
   BTW, we could add a option if it influences loader speed a lot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] seagle-yuan commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
seagle-yuan commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r898629120


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = Log.logger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(this.loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : this.struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, this.struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : this.struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, this.struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                                 "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
+                    this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private synchronized void flushAll() {
+        if (this.closed) {
+            return;
+        }
+        try {
+            for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+                List<String> graphElements = builder.getValue();
+                if (graphElements.size() > 0) {
+                    flush(builder.getKey(), graphElements);
+                }
+            }
+        } catch (Exception e) {
+            throw new LoadException("Failed to flush all data.", e);
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder.getKey(), builder.getValue());
+            }
+        }
+    }
+
+    private Tuple2<String, List<GraphElement>> buildGraphData(ElementBuilder elementBuilder,
+                                                              String row) {
+        JsonNode node;
+        try {
+            node = new ObjectMapper().readTree(row);
+        } catch (JsonProcessingException e) {
+            throw new ParseException(row, e);
+        }
+        JsonNode data = node.get(Constants.CDC_DATA);
+        String op = node.get(Constants.CDC_OP).asText();
+        String[] fields = this.struct.input().header();
+        String[] values = new String[data.size()];
+        for (int i = 0; i < fields.length; i++) {
+            values[i] = data.get(fields[i]).asText();
+        }
+        return Tuple2.of(op, elementBuilder.build(fields, values));
+    }
+
+    private void flush(ElementBuilder elementBuilder, List<String> rows) {
+        GraphManager g = this.loadContext.client().graph();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : rows) {
+            Tuple2<String, List<GraphElement>> graphData = buildGraphData(elementBuilder, row);
+            List<GraphElement> graphElements = graphData.f1;
+            boolean isVertex = elementBuilder.mapping().type().isVertex();
+            switch (Envelope.Operation.forCode(graphData.f0)) {
+                case READ:

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] javeme commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
javeme commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r889910762


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphSinkFunction.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import javax.annotation.Nonnull;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+public class HugeGraphSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction {

Review Comment:
   align with extends



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphSinkFunction.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import javax.annotation.Nonnull;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+public class HugeGraphSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction {
+
+    private static final long serialVersionUID = -2259171589402599426L;
+    private final HugeGraphOutputFormat<Object> outputFormat;
+
+    public HugeGraphSinkFunction(@Nonnull HugeGraphOutputFormat<Object> outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);
+        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        outputFormat.writeRecord(value);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) {
+
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+        outputFormat.close();
+        super.close();
+    }
+

Review Comment:
   remove blank line



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);

Review Comment:
   keep a blank line after class define



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import static java.util.regex.Pattern.compile;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String host = "";
+            int port = -1;
+            Matcher m = compile(Constants.HOST_PORT_REGEX).matcher(input.url());

Review Comment:
   we can try URL class to parse it: https://docs.oracle.com/javase/tutorial/networking/urls/urlInfo.html



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+

Review Comment:
   can add comment `"TODO: xx"` or `"// pass"`



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphSinkFunction.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import javax.annotation.Nonnull;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+public class HugeGraphSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction {
+
+    private static final long serialVersionUID = -2259171589402599426L;
+    private final HugeGraphOutputFormat<Object> outputFormat;
+
+    public HugeGraphSinkFunction(@Nonnull HugeGraphOutputFormat<Object> outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);
+        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        outputFormat.writeRecord(value);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) {
+

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import static java.util.regex.Pattern.compile;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String host = "";
+            int port = -1;
+            Matcher m = compile(Constants.HOST_PORT_REGEX).matcher(input.url());
+            while (m.find()) {
+                host = m.group(1);
+                port = Integer.parseInt(m.group(2));
+            }
+            E.checkArgument(!"".equals(host) && port != -1,
+                            String.format("Failed to parse Url(%s) to get hostName and port",

Review Comment:
   don't need String.format() since checkArgument can do string format with `%s`.
   



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphSinkFunction.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import javax.annotation.Nonnull;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+public class HugeGraphSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction {
+
+    private static final long serialVersionUID = -2259171589402599426L;
+    private final HugeGraphOutputFormat<Object> outputFormat;
+
+    public HugeGraphSinkFunction(@Nonnull HugeGraphOutputFormat<Object> outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);
+        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        outputFormat.writeRecord(value);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) {
+
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) {
+

Review Comment:
   ditto



##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphFlinkCDCLoader.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import static java.util.regex.Pattern.compile;
+
+import java.util.List;
+import java.util.regex.Matcher;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.LoadMapping;
+import com.baidu.hugegraph.loader.source.jdbc.JDBCSource;
+import com.baidu.hugegraph.loader.util.Printer;
+import com.baidu.hugegraph.util.E;
+import com.baidu.hugegraph.util.Log;
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+
+public class HugeGraphFlinkCDCLoader {
+    public static final Logger LOG = Log.logger(HugeGraphFlinkCDCLoader.class);
+
+    private final LoadOptions loadOptions;
+    private final String[] options;
+
+    public HugeGraphFlinkCDCLoader(String[] args) {
+        this.options = args;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    public static void main(String[] args) {
+        HugeGraphFlinkCDCLoader loader;
+        try {
+            loader = new HugeGraphFlinkCDCLoader(args);
+        } catch (Throwable e) {
+            Printer.printError("Failed to start loading", e);
+            return;
+        }
+        loader.load();
+    }
+
+    public void load() {
+        LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
+        List<InputStruct> structs = mapping.structs();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        for (InputStruct struct : structs) {
+            JDBCSource input = (JDBCSource) struct.input();
+            String host = "";
+            int port = -1;
+            Matcher m = compile(Constants.HOST_PORT_REGEX).matcher(input.url());
+            while (m.find()) {
+                host = m.group(1);
+                port = Integer.parseInt(m.group(2));
+            }
+            E.checkArgument(!"".equals(host) && port != -1,
+                            String.format("Failed to parse Url(%s) to get hostName and port",
+                                          input.url()));
+
+            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()

Review Comment:
   prefer mysqlSource



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
simon824 commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r898621756


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+import com.baidu.hugegraph.util.Log;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = Log.logger(HugeGraphOutputFormat.class);
+    private static final long serialVersionUID = -4514164348993670086L;
+    private LoadContext loadContext;
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile boolean closed = false;
+
+    private final LoadOptions loadOptions;
+    private final InputStruct struct;
+    private Map<ElementBuilder, List<String>> builders;
+
+    public HugeGraphOutputFormat(InputStruct struct, String[] args) {
+        this.struct = struct;
+        this.loadOptions = LoadOptions.parseOptions(args);
+    }
+
+    private Map<ElementBuilder, List<String>> initBuilders() {
+        LoadContext loadContext = new LoadContext(this.loadOptions);
+        Map<ElementBuilder, List<String>> builders = new HashMap<>();
+        for (VertexMapping vertexMapping : this.struct.vertices()) {
+            builders.put(new VertexBuilder(loadContext, this.struct, vertexMapping),
+                         new ArrayList<>());
+        }
+        for (EdgeMapping edgeMapping : this.struct.edges()) {
+            builders.put(new EdgeBuilder(loadContext, this.struct, edgeMapping),
+                         new ArrayList<>());
+        }
+        return builders;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+        // pass
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) {
+        this.builders = initBuilders();
+        this.loadContext = new LoadContext(this.loadOptions);
+        int flushIntervalMs = this.loadOptions.flushIntervalMs;
+        if (flushIntervalMs > 0) {
+            this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
+                                 "hugegraph-streamload-outputformat"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
+                    this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private synchronized void flushAll() {
+        if (this.closed) {
+            return;
+        }
+        try {
+            for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
+                List<String> graphElements = builder.getValue();
+                if (graphElements.size() > 0) {
+                    flush(builder.getKey(), graphElements);
+                }
+            }
+        } catch (Exception e) {
+            throw new LoadException("Failed to flush all data.", e);
+        }
+    }
+
+    @Override
+    public synchronized void writeRecord(T row) throws IOException {
+        for (Map.Entry<ElementBuilder, List<String>> builder :
+                this.builders.entrySet()) {
+            ElementMapping elementMapping = builder.getKey().mapping();
+            if (elementMapping.skip()) {
+                continue;
+            }
+
+            // Add batch
+            List<String> graphElements = builder.getValue();
+            graphElements.add(row.toString());
+            if (graphElements.size() > elementMapping.batchSize()) {
+                flush(builder.getKey(), builder.getValue());
+            }
+        }
+    }
+
+    private Tuple2<String, List<GraphElement>> buildGraphData(ElementBuilder elementBuilder,
+                                                              String row) {
+        JsonNode node;
+        try {
+            node = new ObjectMapper().readTree(row);
+        } catch (JsonProcessingException e) {
+            throw new ParseException(row, e);
+        }
+        JsonNode data = node.get(Constants.CDC_DATA);
+        String op = node.get(Constants.CDC_OP).asText();
+        String[] fields = this.struct.input().header();
+        String[] values = new String[data.size()];
+        for (int i = 0; i < fields.length; i++) {
+            values[i] = data.get(fields[i]).asText();
+        }
+        return Tuple2.of(op, elementBuilder.build(fields, values));
+    }
+
+    private void flush(ElementBuilder elementBuilder, List<String> rows) {
+        GraphManager g = this.loadContext.client().graph();
+        ElementMapping elementMapping = elementBuilder.mapping();
+        for (String row : rows) {
+            Tuple2<String, List<GraphElement>> graphData = buildGraphData(elementBuilder, row);
+            List<GraphElement> graphElements = graphData.f1;
+            boolean isVertex = elementBuilder.mapping().type().isVertex();
+            switch (Envelope.Operation.forCode(graphData.f0)) {
+                case READ:

Review Comment:
   `case READ:` does not contain code 
   just like:
   <img width="751" alt="image" src="https://user-images.githubusercontent.com/18065113/173973980-d8e40a3a-b2a9-47e1-bdf3-8a9fbc761cfb.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-hugegraph-toolchain] simon824 commented on a diff in pull request #291: feat: Introduce HugeGraphFlinkCDCLoader

Posted by GitBox <gi...@apache.org>.
simon824 commented on code in PR #291:
URL: https://github.com/apache/incubator-hugegraph-toolchain/pull/291#discussion_r895483120


##########
hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/flink/HugeGraphOutputFormat.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.baidu.hugegraph.driver.GraphManager;
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.exception.LoadException;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
+import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.UpdateStrategy;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import io.debezium.data.Envelope;
+
+public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HugeGraphOutputFormat.class);

Review Comment:
   got it ,thanks for your review.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org