You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/07/01 05:36:42 UTC

[GitHub] [incubator-doris] hf200012 commented on a change in pull request #6107: feat: Implementation Datax doriswriter plugin

hf200012 commented on a change in pull request #6107:
URL: https://github.com/apache/incubator-doris/pull/6107#discussion_r661987417



##########
File path: extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
##########
@@ -0,0 +1,158 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.UUID;
+
+public class DorisWriterEmitter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class);
+	;
+	private final Key keys;
+	private int pos;
+
+
+	public DorisWriterEmitter(final Key keys) {
+		this.keys = keys;
+	}
+
+
+	/**
+	 * execute doris stream load
+	 */
+	public void doStreamLoad(final DorisFlushBatch flushData) throws IOException {
+		final String host = this.getAvailableHost();
+		if (null == host) {
+			throw new IOException("None of the host in `beLoadUrl` could be connected.");
+		}
+		final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
+		LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));

Review comment:
       It is recommended to use info here to facilitate troubleshooting
   debug -> info

##########
File path: extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
##########
@@ -0,0 +1,242 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.druid.sql.parser.ParserException;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+public class DorisWriter extends Writer {
+    public DorisWriter() {
+    }
+
+    public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
+        private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Task.class);
+
+        private DorisWriterEmitter dorisWriterEmitter;
+        private Key keys;
+        private DorisJsonCodec rowCodec;
+
+
+        public Task() {
+        }
+
+        @Override
+        public void init() {
+            this.keys = new Key(super.getPluginJobConf());
+            this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
+            this.dorisWriterEmitter = new DorisWriterEmitter(keys);
+        }
+
+        @Override
+        public void prepare() {
+        }
+
+        @Override
+        public void startWrite(RecordReceiver recordReceiver) {
+            try {
+                List<String> buffer = new ArrayList<>();
+                int batchCount = 0;
+                long batchByteSize = 0L;
+                Record record;
+                // loop to get record from datax
+                while ((record = recordReceiver.getFromReader()) != null) {
+                    // check column size
+                    if (record.getColumnNumber() != this.keys.getColumns().size()) {
+                        throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+                                String.format("config writer column info error. because  the column number of  reader is  :%s and the column number of writer is:%s . please check you datax job config json.", record.getColumnNumber(), this.keys.getColumns().size()));
+                    }
+                    // codec record
+                    final String recordStr = this.rowCodec.serialize(record);
+                    // put into buffer
+                    buffer.add(recordStr);
+                    batchCount += 1;
+                    batchByteSize += recordStr.getBytes().length;
+                    // trigger buffer
+                    if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
+                        // generate doris stream load label
+                        final String label = getStreamLoadLabel();
+                        LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
+                        final DorisFlushBatch flushBatch = new DorisFlushBatch(label, batchByteSize, buffer);
+                        dorisWriterEmitter.doStreamLoad(flushBatch);
+                        // clear buffer
+                        batchCount = 0;
+                        batchByteSize = 0L;
+                        buffer.clear();
+                    }
+                }
+                if (buffer.size() > 0) {
+                    final DorisFlushBatch flushBatch = new DorisFlushBatch(getStreamLoadLabel(), batchByteSize, buffer);
+                    dorisWriterEmitter.doStreamLoad(flushBatch);
+                }
+
+            } catch (Exception e) {
+                throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+            }
+        }
+
+        private String getStreamLoadLabel() {
+            return UUID.randomUUID().toString();
+        }
+
+        @Override
+        public void post() {
+
+        }
+
+        @Override
+        public void destroy() {
+        }
+
+        @Override
+        public boolean supportFailOver() {
+            return false;
+        }
+    }
+
+    public static class Job extends com.alibaba.datax.common.spi.Writer.Job {
+        private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Job.class);
+        private Configuration originalConfig = null;
+        private Key keys;
+
+        public Job() {
+        }
+
+        @Override
+        public void init() {
+            this.originalConfig = super.getPluginJobConf();
+            this.keys = new Key(super.getPluginJobConf());
+            this.keys.doPretreatment();
+        }
+
+        @Override
+        public void preCheck() {
+            this.init();
+            this.preCheckPrePareSQL(this.keys);
+            this.preCheckPostSQL(this.keys);
+        }
+
+        @Override
+        public void prepare() {
+            String username = this.keys.getUsername();
+            String password = this.keys.getPassword();
+            String jdbcUrl = this.keys.getJdbcUrl();
+            List<String> renderedPreSqls = this.renderPreOrPostSqls(this.keys.getPreSqlList(), this.keys.getTable());
+            if (!renderedPreSqls.isEmpty()) {
+                Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
+                LOG.info("prepare execute preSqls:[{}]. doris jdbc url为:{}.", String.join(";", renderedPreSqls), jdbcUrl);
+                this.executeSqls(conn, renderedPreSqls);
+                DBUtil.closeDBResources(null, null, conn);
+            }
+
+        }
+
+        @Override
+        public List<Configuration> split(int mandatoryNumber) {
+            List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
+
+            for(int i = 0; i < mandatoryNumber; ++i) {

Review comment:
       add space after for

##########
File path: extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java
##########
@@ -0,0 +1,158 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.UUID;
+
+public class DorisWriterEmitter {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class);
+	;
+	private final Key keys;
+	private int pos;
+
+
+	public DorisWriterEmitter(final Key keys) {
+		this.keys = keys;
+	}
+
+
+	/**
+	 * execute doris stream load
+	 */
+	public void doStreamLoad(final DorisFlushBatch flushData) throws IOException {
+		final String host = this.getAvailableHost();
+		if (null == host) {
+			throw new IOException("None of the host in `beLoadUrl` could be connected.");
+		}
+		final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
+		LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
+		// do http put request
+		final Map<String, Object> loadResult = this.doHttpPut(loadUrl, flushData.getLabel(), this.mergeRows(flushData.getRows()));
+		// get response
+		final String keyStatus = "Status";
+		if (null == loadResult || !loadResult.containsKey(keyStatus)) {
+			throw new IOException("Unable to flush data to doris: unknown result status.");
+		}
+		LOG.debug("StreamLoad response:\n" + JSON.toJSONString(loadResult));

Review comment:
       It is recommended to use info here to facilitate troubleshooting
   debug -> info

##########
File path: extension/datax-doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java
##########
@@ -0,0 +1,242 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.spi.Writer;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
+import com.alibaba.datax.plugin.rdbms.writer.Constant;
+import com.alibaba.druid.sql.parser.ParserException;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+public class DorisWriter extends Writer {
+    public DorisWriter() {
+    }
+
+    public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
+        private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Task.class);
+
+        private DorisWriterEmitter dorisWriterEmitter;
+        private Key keys;
+        private DorisJsonCodec rowCodec;
+
+
+        public Task() {
+        }
+
+        @Override
+        public void init() {
+            this.keys = new Key(super.getPluginJobConf());
+            this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
+            this.dorisWriterEmitter = new DorisWriterEmitter(keys);
+        }
+
+        @Override
+        public void prepare() {
+        }
+
+        @Override
+        public void startWrite(RecordReceiver recordReceiver) {
+            try {
+                List<String> buffer = new ArrayList<>();
+                int batchCount = 0;
+                long batchByteSize = 0L;
+                Record record;
+                // loop to get record from datax
+                while ((record = recordReceiver.getFromReader()) != null) {
+                    // check column size
+                    if (record.getColumnNumber() != this.keys.getColumns().size()) {
+                        throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
+                                String.format("config writer column info error. because  the column number of  reader is  :%s and the column number of writer is:%s . please check you datax job config json.", record.getColumnNumber(), this.keys.getColumns().size()));
+                    }
+                    // codec record
+                    final String recordStr = this.rowCodec.serialize(record);
+                    // put into buffer
+                    buffer.add(recordStr);
+                    batchCount += 1;
+                    batchByteSize += recordStr.getBytes().length;
+                    // trigger buffer
+                    if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
+                        // generate doris stream load label
+                        final String label = getStreamLoadLabel();
+                        LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
+                        final DorisFlushBatch flushBatch = new DorisFlushBatch(label, batchByteSize, buffer);
+                        dorisWriterEmitter.doStreamLoad(flushBatch);
+                        // clear buffer
+                        batchCount = 0;
+                        batchByteSize = 0L;
+                        buffer.clear();
+                    }
+                }
+                if (buffer.size() > 0) {
+                    final DorisFlushBatch flushBatch = new DorisFlushBatch(getStreamLoadLabel(), batchByteSize, buffer);
+                    dorisWriterEmitter.doStreamLoad(flushBatch);
+                }
+
+            } catch (Exception e) {
+                throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
+            }
+        }
+
+        private String getStreamLoadLabel() {
+            return UUID.randomUUID().toString();

Review comment:
       Data import of datax can be uniformly started with datax_doris_writer_
   eg:"datax_doris_writer_" + UUID.randomUUID().toString();




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org