You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/24 11:31:32 UTC
[GitHub] [airflow] TheRum opened a new issue #14417: DruidOperator failing to submit ingestion tasks : Getting 500 error code from Druid
TheRum opened a new issue #14417:
URL: https://github.com/apache/airflow/issues/14417
Issue : When trying to submit an ingestion task using DruidOperator, getting 500 error code in response from Druid. And can see no task submitted in Druid console.
In Airflow 1.10.x, everything is working fine. But when upgraded to 2.0.1, it is failing to submit the task. There is absolutely no change in the code/files except the import statements.
Resolution : I compared DruidOperator code for both Airflow 1.10.x & 2.0.1 and found one line causing the issue.
In Airflow 2.0.x, before submitting the indexing job json string is converted to python object. But it should be json string only.
In Airflow 1.10.x there is no conversion happening and hence it is working fine. (Please see below code snippets.)
I have already tried this change in my setup and re-ran the ingestion tasks. It is all working fine.
~~hook.submit_indexing_job(json.loads(self.json_index_file))~~
**hook.submit_indexing_job(self.json_index_file)**
Airflow 1.10.x - airflow/contrib/operators/druid_operator.py
```
def execute(self, context):
hook = DruidHook(
druid_ingest_conn_id=self.conn_id,
max_ingestion_time=self.max_ingestion_time
)
self.log.info("Submitting %s", self.index_spec_str)
hook.submit_indexing_job(self.index_spec_str)
```
Airflow 2.0.1 - airflow/providers/apache/druid/operators/druid.py
```
def execute(self, context: Dict[Any, Any]) -> None:
hook = DruidHook(druid_ingest_conn_id=self.conn_id, max_ingestion_time=self.max_ingestion_time)
self.log.info("Submitting %s", self.json_index_file)
hook.submit_indexing_job(json.loads(self.json_index_file))
```
**Apache Airflow version**: 2.0.x
**Error Logs**:
```
[2021-02-24 06:42:24,287] {{connectionpool.py:452}} DEBUG - http://druid-master:8081 "POST /druid/indexer/v1/task HTTP/1.1" 500 15714
[2021-02-24 06:42:24,287] {{taskinstance.py:570}} DEBUG - Refreshing TaskInstance <TaskInstance: druid_compact_daily 2021-02-23T01:20:00+00:00 [running]> from DB
[2021-02-24 06:42:24,296] {{taskinstance.py:605}} DEBUG - Refreshed TaskInstance <TaskInstance: druid_compact_daily 2021-02-23T01:20:00+00:00 [running]>
[2021-02-24 06:42:24,298] {{taskinstance.py:1455}} ERROR - Did not get 200 when submitting the Druid job to http://druid-master.avesta.stg:8081/druid/indexer/v1/task
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil closed issue #14417: DruidOperator failing to submit ingestion tasks : Getting 500 error code from Druid
Posted by GitBox <gi...@apache.org>.
kaxil closed issue #14417:
URL: https://github.com/apache/airflow/issues/14417
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] glingle05 commented on issue #14417: DruidOperator failing to submit ingestion tasks : Getting 500 error code from Druid
Posted by GitBox <gi...@apache.org>.
glingle05 commented on issue #14417:
URL: https://github.com/apache/airflow/issues/14417#issuecomment-900799207
This fix is a workaround to a bug in the DruidHook.submit_indexing_job() itself.
The requests.post line in submit_indexing_job is the real problem:
`
req_index = requests.post(url, data=json_index_spec, headers=self.header, auth=self.get_auth())
`
should be
`
req_index = requests.post(url, json=json_index_spec, headers=self.header, auth=self.get_auth())
`
Notice that passing the json_index_spec Dict using the data argument causes the dict to be converted
using simple string conversion. This results in the string keys and values to be quoted with single-tick (')
which results in the payload Not being json-compliant. If the json_index_spec is passed in using the json param
it is converted to string using json lib and the payload is json with string keys and values being quoted with
double-tick (") and request will succeed.
HTH, Glenn
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #14417: DruidOperator failing to submit ingestion tasks : Getting 500 error code from Druid
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #14417:
URL: https://github.com/apache/airflow/issues/14417#issuecomment-785012624
Thanks for opening your first issue here! Be sure to follow the issue template!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] glingle05 commented on issue #14417: DruidOperator failing to submit ingestion tasks : Getting 500 error code from Druid
Posted by GitBox <gi...@apache.org>.
glingle05 commented on issue #14417:
URL: https://github.com/apache/airflow/issues/14417#issuecomment-900799207
This fix is a workaround to a bug in the DruidHook.submit_indexing_job() itself.
The requests.post line in submit_indexing_job is the real problem:
`
req_index = requests.post(url, data=json_index_spec, headers=self.header, auth=self.get_auth())
`
should be
`
req_index = requests.post(url, json=json_index_spec, headers=self.header, auth=self.get_auth())
`
Notice that passing the json_index_spec Dict using the data argument causes the dict to be converted
using simple string conversion. This results in the string keys and values to be quoted with single-tick (')
which results in the payload Not being json-compliant. If the json_index_spec is passed in using the json param
it is converted to string using json lib and the payload is json with string keys and values being quoted with
double-tick (") and request will succeed.
HTH, Glenn
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org