You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/06/11 00:55:04 UTC

[incubator-doris] branch master updated: [Extension] Logstash Doris output plugin (#3800)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 86d235a  [Extension] Logstash Doris output plugin (#3800)
86d235a is described below

commit 86d235a76a4dddbf8de51301e790d2cf9faf4aed
Author: wfjcmcb <33...@users.noreply.github.com>
AuthorDate: Thu Jun 11 08:54:51 2020 +0800

    [Extension] Logstash Doris output plugin (#3800)
    
    This plugin is used to output data to Doris for logstash
    Use the HTTP protocol to interact with the Doris FE Http interface
    Load data through Doris's stream load
---
 docs/.vuepress/sidebar/en.js                       |   1 +
 docs/.vuepress/sidebar/zh-CN.js                    |   1 +
 docs/en/extending-doris/logstash.md                | 198 ++++++++++++++
 docs/zh-CN/extending-doris/logstash.md             | 198 ++++++++++++++
 extension/logstash/Gemfile                         |  20 ++
 extension/logstash/LICENSE                         |  16 ++
 extension/logstash/README.md                       |  28 ++
 extension/logstash/Rakefile                        |  25 ++
 extension/logstash/lib/logstash/outputs/doris.rb   | 294 +++++++++++++++++++++
 .../lib/logstash/util/shortname_resolver.rb        |  58 ++++
 extension/logstash/logstash-output-doris.gemspec   |  47 ++++
 11 files changed, 886 insertions(+)

diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index c99defd..7d5fefa 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -126,6 +126,7 @@ module.exports = [
       "plugin-development-manual",
       "user-defined-function",
       "spark-doris-connector",
+      "logstash",
     ],
   },
   {
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 9e0ffcc..e1d1508 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -137,6 +137,7 @@ module.exports = [
       "plugin-development-manual",
       "user-defined-function",
       "spark-doris-connector",
+      "logstash",
     ],
   },
   {
diff --git a/docs/en/extending-doris/logstash.md b/docs/en/extending-doris/logstash.md
new file mode 100644
index 0000000..75d5647
--- /dev/null
+++ b/docs/en/extending-doris/logstash.md
@@ -0,0 +1,198 @@
+---
+{
+    "title": "Logstash Doris Output Plugin",
+    "language": "en"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris output plugin
+
+This plugin is used to output data to Doris for logstash, use the HTTP protocol to interact with the Doris FE Http interface, and import data through Doris's stream load.
+
+[Learn more about Doris Stream Load ](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
+
+[Learn more about Doris](http://doris.apache.org/master/zh-CN/)
+
+
+## Install and compile
+### 1.Download source code
+
+### 2.compile ##
+Execute under extension/logstash/ directory
+
+`gem build logstash-output-doris.gemspec`
+
+You will get logstash-output-doris-{version}.gem file in the same directory
+
+### 3.Plug-in installation
+copy logstash-output-doris-{version}.gem to the logstash installation directory
+
+Excuting an order
+
+`./bin/logstash-plugin install logstash-output-doris-{version}.gem` 
+
+Install logstash-output-doris plugin
+
+## Configuration
+### Example:
+
+Create a new configuration file in the config directory and name it logstash-doris.conf
+
+The specific configuration is as follows:
+
+    output {
+        doris {
+            http_hosts => [ "http://fehost:8030" ]
+            user => user_name
+            password => password
+            db => "db_name"
+            table => "table_name"
+            label_prefix => "label_prefix"
+            column_separator => ","
+        }
+    }
+
+Configuration instructions:
+
+Connection configuration:
+
+Configuration | Explanation
+--- | ---
+`http_hosts` | FE's HTTP interactive address eg | ["http://fe1:8030", "http://fe2:8030"]
+`user` | User name, the user needs to have import permission for the doris table
+`password` | Password
+`db` | Database name
+`table` | Table name
+`label_prefix` | Import the identification prefix, the final generated ID is *{label\_prefix}\_{db}\_{table}\_{time_stamp}*
+
+
+Load configuration:([Reference documents](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html))
+
+Configuration | Explanation
+--- | ---
+`column_separator` | Column separator, the default is \t
+`columns` | Used to specify the correspondence between the columns in the import file and the columns in the table
+`where` | The filter conditions specified by the import task
+`max_filter_ratio` | The maximum tolerance rate of the import task, the default is zero tolerance
+`partition` | Partition information of the table to be imported
+`timeout` | timeout, the default is 600s
+`strict_mode` | Strict mode, the default is false
+`timezone` | Specify the time zone used for this import, the default is the East Eight District
+`exec_mem_limit` | Import memory limit, default is 2GB, unit is byte
+
+Other configuration:
+
+Configuration | Explanation
+--- | ---
+`save_on_failure` | If the import fails to save locally, the default is true
+`save_dir` | Local save directory, default is /tmp
+`automatic_retries` | The maximum number of retries on failure, the default is 3
+`batch_size` | The maximum number of events processed per batch, the default is 100000
+`idle_flush_time` | Maximum interval, the default is 20 (seconds)
+
+
+## Start Up
+Run the command to start the doris output plugin:
+
+`{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic`
+
+
+
+
+## Complete usage example
+### 1. Compile doris-output-plugin
+1> Download the ruby compressed package and go to [ruby official website](https://www.ruby-lang.org/en/downloads/) to download it. The version 2.7.1 used here
+
+2> Compile and install, configure ruby environment variables
+
+3> Go to the doris source extension/logstash/ directory and execute
+
+`gem build logstash-output-doris.gemspec`
+
+Get the file logstash-output-doris-0.1.0.gem, and the compilation is complete
+
+### 2. Install and configure filebeat (here use filebeat as input)
+
+1> [es official website](https://www.elastic.co/) Download the filebeat tar compression package and decompress it
+
+2> Enter the filebeat directory and modify the configuration file filebeat.yml as follows:
+
+	filebeat.inputs:
+	- type: log
+	  paths:
+	    - /tmp/doris.data
+	output.logstash:
+	  hosts: ["localhost:5044"]
+
+/tmp/doris.data is the doris data path
+
+3> Start filebeat:
+
+`./filebeat -e -c filebeat.yml -d "publish"`
+
+
+### 3.Install logstash and doris-out-plugin
+1> [es official website](https://www.elastic.co/) Download the logstash tar compressed package and decompress it
+
+2> Copy the logstash-output-doris-0.1.0.gem obtained in step 1 to the logstash installation directory
+
+3> execute
+
+`./bin/logstash-plugin install logstash-output-doris-0.1.0.gem`
+
+Install the plugin
+
+4> Create a new configuration file logstash-doris.conf in the config directory as follows:
+
+	input {
+	    beats {
+	        port => "5044"
+	    }
+	}
+	
+	output {
+	    doris {
+	        http_hosts => [ "http://127.0.0.1:8030" ]
+	        user => doris
+	        password => doris
+	        db => "logstash_output_test"
+	        table => "output"
+	        label_prefix => "doris"
+	        column_separator => ","
+	        columns => "a,b,c,d,e"
+	    }
+	}
+
+The configuration here needs to be configured according to the configuration instructions
+
+5> Start logstash:
+
+./bin/logstash -f ./config/logstash-doris.conf --config.reload.automatic
+
+### 4.Test Load
+
+Add write data to /tmp/doris.data
+
+`echo a,b,c,d,e >> /tmp/doris.data`
+
+Observe the logstash log. If the status of the returned response is Success, the import was successful. At this time, you can view the imported data in the logstash_output_test.output table
+
diff --git a/docs/zh-CN/extending-doris/logstash.md b/docs/zh-CN/extending-doris/logstash.md
new file mode 100644
index 0000000..467a886
--- /dev/null
+++ b/docs/zh-CN/extending-doris/logstash.md
@@ -0,0 +1,198 @@
+---
+{
+    "title": "Logstash Doris Output Plugin",
+    "language": "zh-CN"
+}
+---
+
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris output plugin
+
+该插件用于logstash输出数据到Doris,使用 HTTP 协议与 Doris FE Http接口交互,并通过 Doris 的 stream load 的方式进行数据导入.
+
+[了解Doris Stream Load ](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html)
+
+[了解更多关于Doris](http://doris.apache.org/master/zh-CN/)
+
+
+## 安装和编译
+### 1.下载插件源码
+
+### 2.编译 ##
+在extension/logstash/ 目录下执行
+
+`gem build logstash-output-doris.gemspec`
+
+你将在同目录下得到 logstash-output-doris-{version}.gem 文件
+
+### 3.插件安装
+copy logstash-output-doris-{version}.gem 到 logstash 安装目录下
+
+执行命令
+
+`./bin/logstash-plugin install logstash-output-doris-{version}.gem` 
+
+安装 logstash-output-doris 插件
+
+## 配置
+### 示例:
+
+在config目录下新建一个配置配置文件,命名为 logstash-doris.conf
+
+具体配置如下:
+
+    output {
+        doris {
+            http_hosts => [ "http://fehost:8030" ]
+            user => user_name
+            password => password
+            db => "db_name"
+            table => "table_name"
+            label_prefix => "label_prefix"
+            column_separator => ","
+        }
+    }
+
+配置说明:
+
+连接相关配置:
+
+配置 | 说明
+--- | ---
+`http_hosts` | FE的HTTP交互地址 eg | ["http://fe1:8030", "http://fe2:8030"]
+`user` | 用户名,该用户需要有doris对应库表的导入权限
+`password` | 密码
+`db` | 数据库名
+`table` | 表名
+`label_prefix` | 导入标识前缀,最终生成的标识为 *{label\_prefix}\_{db}\_{table}\_{time_stamp}*
+
+
+导入相关配置:([参考文档](http://doris.apache.org/master/zh-CN/administrator-guide/load-data/stream-load-manual.html))
+
+配置 | 说明
+--- | ---
+`column_separator` | 列分割符,默认为\t。
+`columns` | 用于指定导入文件中的列和 table 中的列的对应关系。
+`where` | 导入任务指定的过滤条件。
+`max_filter_ratio` | 导入任务的最大容忍率,默认零容忍。
+`partition` | 待导入表的 Partition 信息。
+`timeout` | 超时时间,默认为600s。
+`strict_mode` | 严格模式,默认为false。
+`timezone` | 指定本次导入所使用的时区,默认为东八区。
+`exec_mem_limit` | 导入内存限制,默认为 2GB,单位为字节。
+
+其他配置
+
+配置 | 说明
+--- | ---
+`save_on_failure` | 如果导入失败是否在本地保存,默认为true
+`save_dir` | 本地保存目录,默认为 /tmp
+`automatic_retries` | 失败时重试最大次数,默认为3
+`batch_size` | 每批次最多处理的event数量,默认为100000
+`idle_flush_time` | 最大间隔时间,默认为20(秒)
+
+
+## 启动
+执行命令启动doris output plugin:
+
+`{logstash-home}/bin/logstash -f {logstash-home}/config/logstash-doris.conf --config.reload.automatic`
+
+
+
+
+## 完整使用示例
+### 1.编译doris-output-plugin
+1> 下载ruby压缩包,自行到[ruby官网](https://www.ruby-lang.org/en/downloads/)下载,这里使用的2.7.1版本
+
+2> 编译安装,配置ruby的环境变量
+
+3> 到doris源码 extension/logstash/ 目录下,执行
+
+`gem build logstash-output-doris.gemspec`
+
+得到文件 logstash-output-doris-0.1.0.gem,至此编译完成
+
+### 2.安装配置filebeat(此处使用filebeat作为input)
+
+1> [es官网](https://www.elastic.co/)下载 filebeat tar压缩包并解压
+
+2> 进入filebeat目录下,修改配置文件 filebeat.yml 如下:
+
+	filebeat.inputs:
+	- type: log
+	  paths:
+	    - /tmp/doris.data
+	output.logstash:
+	  hosts: ["localhost:5044"]
+
+/tmp/doris.data 为doris数据路径
+
+3> 启动filebeat:
+
+`./filebeat -e -c filebeat.yml -d "publish"`
+
+
+### 3.安装logstash及doris-out-plugin
+1> [es官网](https://www.elastic.co/)下载 logstash tar压缩包并解压
+
+2> 将步骤1中得到的 logstash-output-doris-0.1.0.gem copy到logstash安装目录下
+
+3> 执行
+
+`./bin/logstash-plugin install logstash-output-doris-0.1.0.gem`
+
+安装插件
+
+4> 在config 目录下新建配置文件 logstash-doris.conf 内容如下:
+
+	input {
+	    beats {
+	        port => "5044"
+	    }
+	}
+	
+	output {
+	    doris {
+	        http_hosts => [ "http://127.0.0.1:8030" ]
+	        user => doris
+	        password => doris
+	        db => "logstash_output_test"
+	        table => "output"
+	        label_prefix => "doris"
+	        column_separator => ","
+	        columns => "a,b,c,d,e"
+	    }
+	}
+
+这里的配置需按照配置说明自行配置
+
+5> 启动logstash:
+
+./bin/logstash -f ./config/logstash-doris.conf --config.reload.automatic
+
+### 4.测试功能
+
+向/tmp/doris.data追加写入数据
+
+`echo a,b,c,d,e >> /tmp/doris.data`
+
+观察logstash日志,若返回response的Status为 Success,则导入成功,此时可在 logstash_output_test.output 表中查看已导入的数据
+
diff --git a/extension/logstash/Gemfile b/extension/logstash/Gemfile
new file mode 100644
index 0000000..d2d262b
--- /dev/null
+++ b/extension/logstash/Gemfile
@@ -0,0 +1,20 @@
+=begin
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+=end
+source 'https://rubygems.org'
+gemspec
\ No newline at end of file
diff --git a/extension/logstash/LICENSE b/extension/logstash/LICENSE
new file mode 100644
index 0000000..90705e0
--- /dev/null
+++ b/extension/logstash/LICENSE
@@ -0,0 +1,16 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file
diff --git a/extension/logstash/README.md b/extension/logstash/README.md
new file mode 100644
index 0000000..57e594d
--- /dev/null
+++ b/extension/logstash/README.md
@@ -0,0 +1,28 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+1. How to build
+
+	`gem build logstash-output-doris.gemspec`
+
+2. How to use
+
+	`http://doris.incubator.apache.org/master/en/extending-doris/logstash.html`
+	`http://doris.incubator.apache.org/master/zh-CN/extending-doris/logstash.html`
+
diff --git a/extension/logstash/Rakefile b/extension/logstash/Rakefile
new file mode 100644
index 0000000..27ac367
--- /dev/null
+++ b/extension/logstash/Rakefile
@@ -0,0 +1,25 @@
+=begin
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+=end
+@files=[]
+
+task :default do
+  system("rake -T")
+end
+
+require "logstash/devutils/rake"
diff --git a/extension/logstash/lib/logstash/outputs/doris.rb b/extension/logstash/lib/logstash/outputs/doris.rb
new file mode 100644
index 0000000..15d1b4f
--- /dev/null
+++ b/extension/logstash/lib/logstash/outputs/doris.rb
@@ -0,0 +1,294 @@
+=begin
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+=end
+
+# encoding: utf-8
+require "logstash/outputs/base"
+require "logstash/namespace"
+require "logstash/json"
+require "logstash/util/shortname_resolver"
+require "uri"
+require "stud/buffer"
+require "logstash/plugin_mixins/http_client"
+require "securerandom"
+require "json"
+require "base64"
+require "restclient"
+
+
+class LogStash::Outputs::Doris < LogStash::Outputs::Base
+   include LogStash::PluginMixins::HttpClient
+   include Stud::Buffer
+
+   concurrency :single
+
+   config_name "doris"
+   # hosts array of Doris Frontends. eg ["http://fe1:8030", "http://fe2:8030"]
+   config :http_hosts, :validate => :array, :required => true
+   # the database which data is loaded to
+   config :db, :validate => :string, :required => true
+   # the table which data is loaded to
+   config :table, :validate => :string, :required => true
+   # label prefix of a stream load requst.
+   config :label_prefix, :validate => :string, :required => true
+   # user
+   config :user, :validate => :string, :required => true
+   # password
+   config :password, :validate => :password, :required => true
+   # column separator
+   config :column_separator, :validate => :string, :default => ""
+   # column mappings. eg: "k1, k2, tmpk3, k3 = tmpk3 + 1"
+   config :columns, :validate => :string, :default => ""
+   # where predicate to filter data. eg: "k1 > 1 and k3 < 100"
+   config :where, :validate => :string, :default => ""
+   # max filter ratio
+   config :max_filter_ratio, :validate => :number, :default => -1
+   # partition which data is loaded to. eg: "p1, p2"
+   config :partition, :validate => :array, :default => {}
+   # timeout of a stream load, in second
+   config :timeout, :validate => :number, :default => -1
+   # switch off or on of strict mode
+   config :strict_mode, :validate => :string, :default => "false"
+   # timezone
+   config :timezone, :validate => :string, :default => ""
+   # memory limit of a stream load
+   config :exec_mem_limit, :validate => :number, :default => -1
+
+   # Custom headers to use
+   # format is `headers => ["X-My-Header", "%{host}"]`
+   config :headers, :validate => :hash
+
+   config :batch_size, :validate => :number, :default => 100000
+
+   config :idle_flush_time, :validate => :number, :default => 20
+
+   config :save_on_failure, :validate => :boolean, :default => true
+
+   config :save_dir, :validate => :string, :default => "/tmp"
+
+   config :save_file, :validate => :string, :default => "failed.data"
+
+   config :host_resolve_ttl_sec, :validate => :number, :default => 120
+
+   config :automatic_retries, :validate => :number, :default => 3
+
+
+   def print_plugin_info()
+      @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-doris/ }
+      @plugin_name = @@plugins[0].name
+      @plugin_version = @@plugins[0].version
+      @logger.debug("Running #{@plugin_name} version #{@plugin_version}")
+
+      @logger.info("Initialized doris output with settings",
+      :db => @db,
+      :table => @table,
+      :label_prefix => @label_prefix,
+      :batch_size => @batch_size,
+      :idle_flush_time => @idle_flush_time,
+      :http_hosts => @http_hosts)
+   end
+
+   def register
+      # Handle this deprecated option. TODO: remove the option
+      #@ssl_certificate_validation = @verify_ssl if @verify_ssl
+
+      # We count outstanding requests with this queue
+      # This queue tracks the requests to create backpressure
+      # When this queue is empty no new requests may be sent,
+      # tokens must be added back by the client on success
+      #@request_tokens = SizedQueue.new(@pool_max)
+      #@pool_max.times {|t| @request_tokens << true }
+      #@requests = Array.new
+
+      @http_query = "/api/#{db}/#{table}/_stream_load"
+
+      @hostnames_pool =
+      parse_http_hosts(http_hosts,
+      ShortNameResolver.new(ttl: @host_resolve_ttl_sec, logger: @logger))
+
+      @request_headers = make_request_headers
+      @logger.info("request headers: ", @request_headers)
+
+      buffer_initialize(
+      :max_items => @batch_size,
+      :max_interval => @idle_flush_time,
+      :logger => @logger
+      )
+
+      print_plugin_info()
+   end # def register
+
+   private
+
+   def parse_http_hosts(hosts, resolver)
+      ip_re = /^[\d]+\.[\d]+\.[\d]+\.[\d]+$/
+
+      lambda {
+         hosts.flat_map { |h|
+            scheme = URI(h).scheme
+            host = URI(h).host
+            port = URI(h).port
+            path = URI(h).path
+
+            if ip_re !~ host
+               resolver.get_addresses(host).map { |ip|
+                  "#{scheme}://#{ip}:#{port}#{path}"
+               }
+            else
+               [h]
+            end
+         }
+      }
+   end
+
+   private
+
+   def get_host_addresses()
+      begin
+         @hostnames_pool.call
+      rescue Exception => ex
+         @logger.error('Error while resolving host', :error => ex.to_s)
+      end
+   end
+
+   # This module currently does not support parallel requests as that would circumvent the batching
+   def receive(event)
+      buffer_receive(event)
+   end
+
+   public
+   def flush(events, close=false)
+      documents = ""
+      event_num = 0
+      events.each do |event|
+         documents << event.get("[message]") << "\n"
+         event_num += 1
+      end
+
+      @logger.info("get event num: #{event_num}")
+      @logger.debug("get documents: #{documents}")
+
+      hosts = get_host_addresses()
+
+      @request_headers["label"] = label_prefix + "_" + @db + "_" + @table + "_" + Time.now.strftime('%Y%m%d%H%M%S_%L')
+      make_request(documents, hosts, @http_query, 1, hosts.sample)
+   end
+
+   private
+
+   def save_to_disk(documents)
+      begin
+         file = File.open("#{save_dir}/#{db}_#{table}_#{save_file}", "a")
+         file.write(documents)
+      rescue IOError => e
+         log_failure("An error occurred while saving file to disk: #{e}",
+         :file_name => file_name)
+      ensure
+         file.close unless file.nil?
+      end
+   end
+
+
+   private
+
+   def make_request(documents, hosts, query, req_count = 1,host = "", uuid = SecureRandom.hex)
+
+      if host == ""
+         host = hosts.pop
+      end
+
+      url = host+query
+      @logger.debug("req count: #{req_count}. get url: #{url}")
+      @logger.debug("request headers: ", @request_headers)
+      
+
+      result = RestClient.put(url, documents,@request_headers) { |response, request, result|
+                case response.code
+                when 301, 302, 307
+                    @logger.debug("redirect to: #{response.headers[:location]}")
+                    response.follow_redirection
+                else
+                    response.return!
+                end
+            }
+
+      @logger.info("response : \n #{result}" )
+      result_body = JSON.parse(result.body)
+      if result_body['Status'] != "Success"
+         if req_count < @automatic_retries
+            @logger.warn("Response Status : #{result_body['Status']} . Retrying...... #{req_count}")
+            make_request(documents,hosts,query,req_count + 1,host,uuid)
+            return
+         end
+         @logger.warn("Load failed ! Try #{req_count} times.")
+         if @save_on_failure
+            @logger.warn("Retry times over #{req_count} times.Try save to disk.Disk file path : #{save_dir}/#{table}_#{save_file}")
+            save_to_disk(documents)
+         end
+      end
+
+   end # def make_request
+
+    # This is split into a separate method mostly to help testing
+   def log_failure(message, opts)
+      @logger.warn("[HTTP Output Failure] #{message}", opts)
+   end
+
+   def make_request_headers()
+      headers = @headers || {}
+      headers["Expect"] ||= "100-continue"
+      headers["Content-Type"] ||= "text/plain;charset=utf-8"
+      headers["strict_mode"] ||= @strict_mode
+      headers["Authorization"] = "Basic " + Base64.strict_encode64("#{user}:#{password.value}")
+      # column_separator
+      if @column_separator != ""
+         headers["column_separator"] = @column_separator
+      end
+      # timezone
+      if @timezone != ""
+           headers["timezone"] = @timezone
+      end
+      # partition
+      if @partition.size > 0
+           headers["partition"] ||= @partition
+      end
+      # where
+      if @where != ""
+           headers["where"] ||= @where
+      end
+      # timeout
+      if @timeout != -1
+           headers["timeout"] ||= @timeout
+      end
+      # max_filter_ratio
+      if @max_filter_ratio != -1
+           headers["max_filter_ratio"] ||= @max_filter_ratio
+      end
+      # exec_mem_limit
+      if @exec_mem_limit != -1
+           headers["exec_mem_limit"] ||= @exec_mem_limit
+      end
+      # columns
+      if @columns != ""
+          headers["columns"] ||= @columns
+      end
+      headers
+   end
+end # end of class LogStash::Outputs::Doris
+
+
diff --git a/extension/logstash/lib/logstash/util/shortname_resolver.rb b/extension/logstash/lib/logstash/util/shortname_resolver.rb
new file mode 100644
index 0000000..1437ccb
--- /dev/null
+++ b/extension/logstash/lib/logstash/util/shortname_resolver.rb
@@ -0,0 +1,58 @@
+=begin
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+=end
+require 'resolv'
+require 'mini_cache'
+
+class ShortNameResolver
+  def initialize(ttl:, logger:)
+    @ttl = ttl
+    @store = MiniCache::Store.new
+    @logger = logger
+  end
+
+  private
+  def resolve_cached(shortname)
+    @store.get_or_set(shortname) do
+      addresses = resolve(shortname)
+      raise "Bad shortname '#{shortname}'" if addresses.empty?
+      MiniCache::Data.new(addresses, expires_in: @ttl)
+    end
+  end
+
+  private
+  def resolve(shortname)
+    addresses = Resolv::DNS.open do |dns|
+      dns.getaddresses(shortname).map { |r| r.to_s }
+    end
+
+    @logger.info("Resolved shortname '#{shortname}' to addresses #{addresses}")
+
+    return addresses
+  end
+
+  public
+  def get_address(shortname)
+    return resolve_cached(shortname).sample
+  end
+
+  public
+  def get_addresses(shortname)
+    return resolve_cached(shortname)
+  end
+end
diff --git a/extension/logstash/logstash-output-doris.gemspec b/extension/logstash/logstash-output-doris.gemspec
new file mode 100644
index 0000000..91d28f0
--- /dev/null
+++ b/extension/logstash/logstash-output-doris.gemspec
@@ -0,0 +1,47 @@
+=begin
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+=end
+Gem::Specification.new do |s|
+  s.name            = 'logstash-output-doris'
+  s.version         = '0.1.0'
+  s.author          = 'wfjcmcb'
+  s.email           = 'dev@doris.apache.org'
+  s.homepage        = 'http://doris.apache.org'
+  s.licenses        = ['Apache-2.0']
+  s.summary         = "This output lets you `PUT` messages in a batched fashion to Doris HTTP endpoint"
+  s.description     = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
+  s.require_paths = ["lib"]
+
+  # Files
+  s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','Gemfile','LICENSE' ]
+
+  # Tests
+  s.test_files = s.files.grep(%r{^(test|spec|features)/})
+
+  # Special flag to let us know this is actually a logstash plugin
+  s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
+
+  # Gem dependencies
+  s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
+  s.add_runtime_dependency 'mini_cache', ">= 1.0.0", "< 2.0.0"
+  s.add_runtime_dependency "rest-client", '~> 2.1'
+
+  s.add_development_dependency 'logstash-devutils', '~> 2.0', '>= 2.0.3'
+  s.add_development_dependency 'sinatra', '~> 2.0', '>= 2.0.8.1'
+  s.add_development_dependency 'webrick', '~> 1.6'
+end


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org