You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by lewis john mcgibbney <le...@apache.org> on 2022/03/04 18:01:30 UTC

Optimal mechanism for loading data into KubernetesPodOperator

Hi users,
We are using the KubernetesPodOperator to isolate some code which writes
data into a VERY OLD Elasticsearch 2.X cluster. Please don't make fun of me
for this!!!
We are wondering, does a recommended practice exists for processing (JSON)
data within the KubernetesPodOperator?
Currently, we've experimented with passing various volumes of JSON string
data to the KubernetesPodOperator 'argument' parameter. This works for
reasonably small record batches such as 100's but fails for >10k's records.
Should we be using a custom XCom backend to pull data into the container
rather than push it via 'arguments'?
Thank you
lewismc

-- 
http://home.apache.org/~lewismc/
http://people.apache.org/keys/committer/lewismc

Re: Optimal mechanism for loading data into KubernetesPodOperator

Posted by Eric Chiu <es...@gmail.com>.
So in order to get around running airflow inside the KPO, we do the
following:
We know that the meta-database stores the XCOM data or in this case,
because of the enabled custom xcom, stores the s3 filename.
We know that to get to the xcom s3 file name we can query the xcom table
based on the primary keys (dag_id, task_id and execution_date).
Thus in our KPO we query the xcom table to get the s3 filename and then
proceed to retrieve the s3 data via aws boto3.
I am not sure if this is the most efficient way or even if it is the
correct way to do it, but it seems to work.
We do realize that if the xcom table changes or changes its primary keys
that this would stop working.

On Fri, Mar 4, 2022 at 12:08 PM Daniel Standish <dp...@gmail.com>
wrote:

> > Absolutely. We wrote a custom AWS S3 XCom backend to do exactly that.
>
> Well if you have it all working then what are we yabbering about :)
>
> I think custom XCom requires that your container is running airflow -- but
> you are using KPO so I don't know if that's true?  Either that or it forces
> you to pull the data in via templating but if it's large data that's back
> to your orig problem.  Did your custom backend work for you?  If so, what's
> the problem you're facing?  If it works, then great and no reason to second
> guess it I'd think.
>
> You certainly don't _need_ to use custom xcom.  And I am not sure you
> really need KPO?  Maybe you need an ancient client to work with your
> ancient ES cluster?  And modern client for most of your ES jobs?
>
> But anyway if I were doing this, my first thought would be not to bother
> with xcom (though I "grew up" in a pre-taskflow API world).
>
> my_data_path = "s3://bucket/key.txt"
> op1 = GetDataToS3(task_id='to_s3', path=my_data_path)
> op2 = LoadToES(task_id='to_es', path=my_data_path)
> op1 >> op3
>
>
>
>

Re: Optimal mechanism for loading data into KubernetesPodOperator

Posted by Daniel Standish <dp...@gmail.com>.
> Absolutely. We wrote a custom AWS S3 XCom backend to do exactly that.

Well if you have it all working then what are we yabbering about :)

I think custom XCom requires that your container is running airflow -- but
you are using KPO so I don't know if that's true?  Either that or it forces
you to pull the data in via templating but if it's large data that's back
to your orig problem.  Did your custom backend work for you?  If so, what's
the problem you're facing?  If it works, then great and no reason to second
guess it I'd think.

You certainly don't _need_ to use custom xcom.  And I am not sure you
really need KPO?  Maybe you need an ancient client to work with your
ancient ES cluster?  And modern client for most of your ES jobs?

But anyway if I were doing this, my first thought would be not to bother
with xcom (though I "grew up" in a pre-taskflow API world).

my_data_path = "s3://bucket/key.txt"
op1 = GetDataToS3(task_id='to_s3', path=my_data_path)
op2 = LoadToES(task_id='to_es', path=my_data_path)
op1 >> op3

Re: Optimal mechanism for loading data into KubernetesPodOperator

Posted by Lewis John McGibbney <le...@apache.org>.
Hi Daniel,
Thanks for engaging me on this one.

On 2022/03/04 18:17:58 Daniel Standish wrote:
> Where is the data coming from?  

From a previous Airflow task. 

Task A generate arbitrary JSON data
Task B process arbitrary JSON data in KubernetesPodOperator. Again, we use KubernetesPodOperator because we are trying to isolate this legacy feature and 'manage' it separately from other tasks.

> Can you refactor your task so that it reads
> data from cloud storage and pushes it into ES?  Rather than taking the data
> as an arg to the task.  So instead, your arg is
> `s3://blah-bucket/blah-file.ndjson.gz` or something.

Absolutely. We wrote a custom AWS S3 XCom backend to do exactly that.

Is that your guidance?

Thank you
lewismc

Re: Optimal mechanism for loading data into KubernetesPodOperator

Posted by Daniel Standish <dp...@gmail.com>.
Where is the data coming from?  Can you refactor your task so that it reads
data from cloud storage and pushes it into ES?  Rather than taking the data
as an arg to the task.  So instead, your arg is
`s3://blah-bucket/blah-file.ndjson.gz` or something.

On Fri, Mar 4, 2022 at 10:12 AM Lewis John McGibbney <le...@apache.org>
wrote:

> For example this is the message we get
>
> HTTP response body:
> {
>     "kind": "Status",
>     "apiVersion": "v1",
>     "metadata": {},
>     "status": "Failure",
>     "message": "rpc error: code = ResourceExhausted desc = grpc: trying to
> send message larger than max (11652891 vs. 2097152)",
>     "code": 500
> }
>
> I know that this indicates we have exceeded data volume however I am still
> curious to hear which architectural approach is 'better'.
>
> Thanks for any assistance.
> lewismc
>
> On 2022/03/04 18:01:30 lewis john mcgibbney wrote:
> > Hi users,
> > We are using the KubernetesPodOperator to isolate some code which writes
> > data into a VERY OLD Elasticsearch 2.X cluster. Please don't make fun of
> me
> > for this!!!
> > We are wondering, does a recommended practice exists for processing
> (JSON)
> > data within the KubernetesPodOperator?
> > Currently, we've experimented with passing various volumes of JSON string
> > data to the KubernetesPodOperator 'argument' parameter. This works for
> > reasonably small record batches such as 100's but fails for >10k's
> records.
> > Should we be using a custom XCom backend to pull data into the container
> > rather than push it via 'arguments'?
> > Thank you
> > lewismc
> >
> > --
> > http://home.apache.org/~lewismc/
> > http://people.apache.org/keys/committer/lewismc
> >
>

Re: Optimal mechanism for loading data into KubernetesPodOperator

Posted by Lewis John McGibbney <le...@apache.org>.
For example this is the message we get

HTTP response body: 
{
    "kind": "Status",
    "apiVersion": "v1",
    "metadata": {},
    "status": "Failure",
    "message": "rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (11652891 vs. 2097152)",
    "code": 500
}

I know that this indicates we have exceeded data volume however I am still curious to hear which architectural approach is 'better'.

Thanks for any assistance.
lewismc

On 2022/03/04 18:01:30 lewis john mcgibbney wrote:
> Hi users,
> We are using the KubernetesPodOperator to isolate some code which writes
> data into a VERY OLD Elasticsearch 2.X cluster. Please don't make fun of me
> for this!!!
> We are wondering, does a recommended practice exists for processing (JSON)
> data within the KubernetesPodOperator?
> Currently, we've experimented with passing various volumes of JSON string
> data to the KubernetesPodOperator 'argument' parameter. This works for
> reasonably small record batches such as 100's but fails for >10k's records.
> Should we be using a custom XCom backend to pull data into the container
> rather than push it via 'arguments'?
> Thank you
> lewismc
> 
> -- 
> http://home.apache.org/~lewismc/
> http://people.apache.org/keys/committer/lewismc
>