You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by whh_960101 <wh...@163.com> on 2020/10/22 07:47:16 UTC

pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);Connector Options
| Option | Required | Default | Type | Description |
|
connector
| required | (none) | String | Specify what connector to use, valid values are:
elasticsearch-6: connect to Elasticsearch 6.x cluster
elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
|
|
hosts
| required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
|
index
| required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
|
document-type
| required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
|
document-id.key-delimiter
| optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
|
failure-handler
| optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
fail: throws an exception if a request fails and thus causes a job failure.
ignore: ignores failures and drops the request.
retry_rejected: re-adds requests that have failed due to queue capacity saturation.
custom class name: for failure handling with a ActionRequestFailureHandler subclass.
|
|
sink.flush-on-checkpoint
| optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
|
sink.bulk-flush.max-actions
| optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
|
sink.bulk-flush.max-size
| optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
|
sink.bulk-flush.interval
| optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
|
sink.bulk-flush.backoff.strategy
| optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
DISABLED: no retry performed, i.e. fail after the first request error.
CONSTANT: wait for backoff delay between retries.
EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
|
|
sink.bulk-flush.backoff.max-retries
| optional | 8 | Integer | Maximum number of backoff retries. |
|
sink.bulk-flush.backoff.delay
| optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
|
connection.max-retry-timeout
| optional | (none) | Duration | Maximum timeout between retries. |
|
connection.path-prefix
| optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
|
format
| optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |






 

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

Pyflink 1.11还不支持datastream,1.12才有

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年10月27日周二 下午2:58写道:

> 有没有其他方式可以写入username和password,我了解java
> flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
有没有其他方式可以写入username和password,我了解java flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!







在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>





 

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?

















在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>

Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

从源码编译安装把。可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink

Best,
Xingbo

whh_960101 <wh...@163.com> 于2020年10月22日周四 下午6:47写道:

> 现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by whh_960101 <wh...@163.com>.
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo" <ka...@gmail.com> 写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>

Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

Posted by Yangze Guo <ka...@gmail.com>.
1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]

[1] https://issues.apache.org/jira/browse/FLINK-18361

Best,
Yangze Guo

On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <wh...@163.com> wrote:
>
> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE TABLE myUserTable (
>   user_id STRING,
>   user_name STRING
>   uv BIGINT,
>   pv BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'elasticsearch-7',
>   'hosts' = 'http://localhost:9200',
>   'index' = 'users'
> );Connector Options
> | Option | Required | Default | Type | Description |
> |
> connector
> | required | (none) | String | Specify what connector to use, valid values are:
> elasticsearch-6: connect to Elasticsearch 6.x cluster
> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> |
> |
> hosts
> | required | (none) | String | One or more Elasticsearch hosts to connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> |
> index
> | required | (none) | String | Elasticsearch index for every record. Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more details. |
> |
> document-type
> | required in 6.x | (none) | String | Elasticsearch document type. Not necessary anymore in elasticsearch-7. |
> |
> document-id.key-delimiter
> | optional | _ | String | Delimiter for composite keys ("_" by default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> |
> failure-handler
> | optional | fail | String | Failure handling strategy in case a request to Elasticsearch fails. Valid strategies are:
> fail: throws an exception if a request fails and thus causes a job failure.
> ignore: ignores failures and drops the request.
> retry_rejected: re-adds requests that have failed due to queue capacity saturation.
> custom class name: for failure handling with a ActionRequestFailureHandler subclass.
> |
> |
> sink.flush-on-checkpoint
> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink will not wait for all pending action requests to be acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong guarantees for at-least-once delivery of action requests. |
> |
> sink.bulk-flush.max-actions
> | optional | 1000 | Integer | Maximum number of buffered actions per bulk request. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.max-size
> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions per bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
> |
> sink.bulk-flush.interval
> | optional | 1s | Duration | The interval to flush buffered actions. Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set allowing for complete async processing of buffered actions. |
> |
> sink.bulk-flush.backoff.strategy
> | optional | DISABLED | String | Specify how to perform retries if any flush actions failed due to a temporary request error. Valid strategies are:
> DISABLED: no retry performed, i.e. fail after the first request error.
> CONSTANT: wait for backoff delay between retries.
> EXPONENTIAL: initially wait for backoff delay and increase exponentially between retries.
> |
> |
> sink.bulk-flush.backoff.max-retries
> | optional | 8 | Integer | Maximum number of backoff retries. |
> |
> sink.bulk-flush.backoff.delay
> | optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, this is the initial base delay. |
> |
> connection.max-retry-timeout
> | optional | (none) | Duration | Maximum timeout between retries. |
> |
> connection.path-prefix
> | optional | (none) | String | Prefix string to be added to every REST communication, e.g., '/v1' |
> |
> format
> | optional | json | String | Elasticsearch connector supports to specify a format. The format must produce a valid json document. By default uses built-in 'json' format. Please refer to JSON Format page for more details. |
>
>
>
>
>
>
>