You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by Ryan Schachte <co...@ryan-schachte.com> on 2020/09/11 07:11:58 UTC

ORC vector rollback

I'm writing a streaming application that converts incoming data into ORC in
real-time. One thing I'm implementing is a dead-letter queue that still
allows me to continue the batch processing even if a single record fails.

The caveat to this, is I want to remove the data that has been written thus
far if a failure occurs on say the 6th column out of 10 columns. For
example:

I write the following data:

{
 firstName: blah1,
 lastName: blah2,
 otherData: blah3
}

My question is, if I fail on otherData, I want to "rollback" the data from
the column vectors at the current vectorPosition I'm iterating on. Is it as
simple as setting colVector.isNull[vectorPosition] to true and setting
colVector.noNulls to false? I wanted to originally go into the index for
each column vector and override, but I don't see an easy way to do that.

Cheers!!
Ryan Schachte

Re: ORC vector rollback

Posted by Ryan Schachte <co...@gmail.com>.
Hi Owen,
Great. This is actually what I needed confirmation on. All seems to be good
now. If I encounter a failure now, I just don't increase the size of my
vectorized row batch and my smoke tests seems to be satisfying what I want.
Thanks!

On Fri, Sep 11, 2020 at 11:38 AM Owen O'Malley <ow...@gmail.com>
wrote:

> What I'd propose is that in addToVector, which I assume is your code, you
> catch exceptions and roll back the VectorizedRowBatch.size to the previous
> row by subtracting one. That will effectively wipe out the previous partial
> row. For complex types, you won't reclaim the values, but they won't be
> written to the file.
>
> .. Owen
>
> On Fri, Sep 11, 2020 at 5:58 PM Ryan Schachte <co...@gmail.com>
> wrote:
>
> > Hi Owen,
> > Thanks for the quick response.
> >
> > Essentially, I have an Avro -> ORC real-time conversion process I have. I
> > do the conversion myself using the Java API. In the case I (internally in
> > my code) hit a serialization failure, etc. then I push to a queue to
> handle
> > offline.
> > However, since I write the data for a single record column vector by
> column
> > vector, I want to make sure I don't have partial data from the failed
> > record still in the vector positions for that failed record.
> >
> > Here is a small snippet to elucidate what I'm doing. *addToVector* could
> > fail for any sort of reason, so I track the failed avro record in a
> > separate thread, but want to make sure for that vectorPosition that the
> > other column vectors are reset? Maybe to defaults? Maybe it's a dumb
> > question, but I can't figure out a smart way to do that or if I'm
> thinking
> > about that rollback idea correctly. Hopefully that is clear. Thanks Owen!
> >
> > for (int c = 0; c < batch.numCols; c++) {
> >   ColumnVector colVector = batch.cols[c];
> >   final String thisField = orcSchema.getFieldNames().get(c);
> >   int vectorPosition = batch.size;
> >
> >   Logger.orcConversionStatus(LOGGER_TRACE_ID, CLASS_LOCATION,
> >       String.format("Processing field: %s", thisField));
> >   final TypeDescription type = orcSchema.getChildren().get(c);
> >
> >   Object fieldValue = record.get(thisField);
> >   Schema.Field avroField = currSchema.getField(thisField);
> >
> >   // If this fails on some column X, I want to rollback the data I've
> > written for batch.numCols - X
> >   addToVector(type, colVector, avroField.schema(), fieldValue,
> > vectorPosition);
> > }
> >
> >
> > On Fri, Sep 11, 2020 at 10:37 AM Owen O'Malley <ow...@gmail.com>
> > wrote:
> >
> > > Where is the failure happening? If it is happening in the ORC writer
> > code,
> > > there isn't a way to do that. Can I ask what kind of exception you are
> > > hitting? In the column (aka tree) writers, there shouldn't be much that
> > can
> > > go wrong. It doesn't even write to the file handle, just buffering in
> > > memory.
> > >
> > > If the problem is in your code, you should be able to use the selected
> > > vector in the VectorizedRowBatch to just select the other rows.
> > >
> > > .. Owen
> > >
> > > On Fri, Sep 11, 2020 at 7:12 AM Ryan Schachte <co...@ryan-schachte.com>
> > > wrote:
> > >
> > > > I'm writing a streaming application that converts incoming data into
> > ORC
> > > in
> > > > real-time. One thing I'm implementing is a dead-letter queue that
> still
> > > > allows me to continue the batch processing even if a single record
> > fails.
> > > >
> > > > The caveat to this, is I want to remove the data that has been
> written
> > > thus
> > > > far if a failure occurs on say the 6th column out of 10 columns. For
> > > > example:
> > > >
> > > > I write the following data:
> > > >
> > > > {
> > > >  firstName: blah1,
> > > >  lastName: blah2,
> > > >  otherData: blah3
> > > > }
> > > >
> > > > My question is, if I fail on otherData, I want to "rollback" the data
> > > from
> > > > the column vectors at the current vectorPosition I'm iterating on. Is
> > it
> > > as
> > > > simple as setting colVector.isNull[vectorPosition] to true and
> setting
> > > > colVector.noNulls to false? I wanted to originally go into the index
> > for
> > > > each column vector and override, but I don't see an easy way to do
> > that.
> > > >
> > > > Cheers!!
> > > > Ryan Schachte
> > > >
> > >
> >
>

Re: ORC vector rollback

Posted by Owen O'Malley <ow...@gmail.com>.
What I'd propose is that in addToVector, which I assume is your code, you
catch exceptions and roll back the VectorizedRowBatch.size to the previous
row by subtracting one. That will effectively wipe out the previous partial
row. For complex types, you won't reclaim the values, but they won't be
written to the file.

.. Owen

On Fri, Sep 11, 2020 at 5:58 PM Ryan Schachte <co...@gmail.com>
wrote:

> Hi Owen,
> Thanks for the quick response.
>
> Essentially, I have an Avro -> ORC real-time conversion process I have. I
> do the conversion myself using the Java API. In the case I (internally in
> my code) hit a serialization failure, etc. then I push to a queue to handle
> offline.
> However, since I write the data for a single record column vector by column
> vector, I want to make sure I don't have partial data from the failed
> record still in the vector positions for that failed record.
>
> Here is a small snippet to elucidate what I'm doing. *addToVector* could
> fail for any sort of reason, so I track the failed avro record in a
> separate thread, but want to make sure for that vectorPosition that the
> other column vectors are reset? Maybe to defaults? Maybe it's a dumb
> question, but I can't figure out a smart way to do that or if I'm thinking
> about that rollback idea correctly. Hopefully that is clear. Thanks Owen!
>
> for (int c = 0; c < batch.numCols; c++) {
>   ColumnVector colVector = batch.cols[c];
>   final String thisField = orcSchema.getFieldNames().get(c);
>   int vectorPosition = batch.size;
>
>   Logger.orcConversionStatus(LOGGER_TRACE_ID, CLASS_LOCATION,
>       String.format("Processing field: %s", thisField));
>   final TypeDescription type = orcSchema.getChildren().get(c);
>
>   Object fieldValue = record.get(thisField);
>   Schema.Field avroField = currSchema.getField(thisField);
>
>   // If this fails on some column X, I want to rollback the data I've
> written for batch.numCols - X
>   addToVector(type, colVector, avroField.schema(), fieldValue,
> vectorPosition);
> }
>
>
> On Fri, Sep 11, 2020 at 10:37 AM Owen O'Malley <ow...@gmail.com>
> wrote:
>
> > Where is the failure happening? If it is happening in the ORC writer
> code,
> > there isn't a way to do that. Can I ask what kind of exception you are
> > hitting? In the column (aka tree) writers, there shouldn't be much that
> can
> > go wrong. It doesn't even write to the file handle, just buffering in
> > memory.
> >
> > If the problem is in your code, you should be able to use the selected
> > vector in the VectorizedRowBatch to just select the other rows.
> >
> > .. Owen
> >
> > On Fri, Sep 11, 2020 at 7:12 AM Ryan Schachte <co...@ryan-schachte.com>
> > wrote:
> >
> > > I'm writing a streaming application that converts incoming data into
> ORC
> > in
> > > real-time. One thing I'm implementing is a dead-letter queue that still
> > > allows me to continue the batch processing even if a single record
> fails.
> > >
> > > The caveat to this, is I want to remove the data that has been written
> > thus
> > > far if a failure occurs on say the 6th column out of 10 columns. For
> > > example:
> > >
> > > I write the following data:
> > >
> > > {
> > >  firstName: blah1,
> > >  lastName: blah2,
> > >  otherData: blah3
> > > }
> > >
> > > My question is, if I fail on otherData, I want to "rollback" the data
> > from
> > > the column vectors at the current vectorPosition I'm iterating on. Is
> it
> > as
> > > simple as setting colVector.isNull[vectorPosition] to true and setting
> > > colVector.noNulls to false? I wanted to originally go into the index
> for
> > > each column vector and override, but I don't see an easy way to do
> that.
> > >
> > > Cheers!!
> > > Ryan Schachte
> > >
> >
>

Re: ORC vector rollback

Posted by Ryan Schachte <co...@gmail.com>.
Hi Owen,
Thanks for the quick response.

Essentially, I have an Avro -> ORC real-time conversion process I have. I
do the conversion myself using the Java API. In the case I (internally in
my code) hit a serialization failure, etc. then I push to a queue to handle
offline.
However, since I write the data for a single record column vector by column
vector, I want to make sure I don't have partial data from the failed
record still in the vector positions for that failed record.

Here is a small snippet to elucidate what I'm doing. *addToVector* could
fail for any sort of reason, so I track the failed avro record in a
separate thread, but want to make sure for that vectorPosition that the
other column vectors are reset? Maybe to defaults? Maybe it's a dumb
question, but I can't figure out a smart way to do that or if I'm thinking
about that rollback idea correctly. Hopefully that is clear. Thanks Owen!

for (int c = 0; c < batch.numCols; c++) {
  ColumnVector colVector = batch.cols[c];
  final String thisField = orcSchema.getFieldNames().get(c);
  int vectorPosition = batch.size;

  Logger.orcConversionStatus(LOGGER_TRACE_ID, CLASS_LOCATION,
      String.format("Processing field: %s", thisField));
  final TypeDescription type = orcSchema.getChildren().get(c);

  Object fieldValue = record.get(thisField);
  Schema.Field avroField = currSchema.getField(thisField);

  // If this fails on some column X, I want to rollback the data I've
written for batch.numCols - X
  addToVector(type, colVector, avroField.schema(), fieldValue, vectorPosition);
}


On Fri, Sep 11, 2020 at 10:37 AM Owen O'Malley <ow...@gmail.com>
wrote:

> Where is the failure happening? If it is happening in the ORC writer code,
> there isn't a way to do that. Can I ask what kind of exception you are
> hitting? In the column (aka tree) writers, there shouldn't be much that can
> go wrong. It doesn't even write to the file handle, just buffering in
> memory.
>
> If the problem is in your code, you should be able to use the selected
> vector in the VectorizedRowBatch to just select the other rows.
>
> .. Owen
>
> On Fri, Sep 11, 2020 at 7:12 AM Ryan Schachte <co...@ryan-schachte.com>
> wrote:
>
> > I'm writing a streaming application that converts incoming data into ORC
> in
> > real-time. One thing I'm implementing is a dead-letter queue that still
> > allows me to continue the batch processing even if a single record fails.
> >
> > The caveat to this, is I want to remove the data that has been written
> thus
> > far if a failure occurs on say the 6th column out of 10 columns. For
> > example:
> >
> > I write the following data:
> >
> > {
> >  firstName: blah1,
> >  lastName: blah2,
> >  otherData: blah3
> > }
> >
> > My question is, if I fail on otherData, I want to "rollback" the data
> from
> > the column vectors at the current vectorPosition I'm iterating on. Is it
> as
> > simple as setting colVector.isNull[vectorPosition] to true and setting
> > colVector.noNulls to false? I wanted to originally go into the index for
> > each column vector and override, but I don't see an easy way to do that.
> >
> > Cheers!!
> > Ryan Schachte
> >
>

Re: ORC vector rollback

Posted by Owen O'Malley <ow...@gmail.com>.
Where is the failure happening? If it is happening in the ORC writer code,
there isn't a way to do that. Can I ask what kind of exception you are
hitting? In the column (aka tree) writers, there shouldn't be much that can
go wrong. It doesn't even write to the file handle, just buffering in
memory.

If the problem is in your code, you should be able to use the selected
vector in the VectorizedRowBatch to just select the other rows.

.. Owen

On Fri, Sep 11, 2020 at 7:12 AM Ryan Schachte <co...@ryan-schachte.com>
wrote:

> I'm writing a streaming application that converts incoming data into ORC in
> real-time. One thing I'm implementing is a dead-letter queue that still
> allows me to continue the batch processing even if a single record fails.
>
> The caveat to this, is I want to remove the data that has been written thus
> far if a failure occurs on say the 6th column out of 10 columns. For
> example:
>
> I write the following data:
>
> {
>  firstName: blah1,
>  lastName: blah2,
>  otherData: blah3
> }
>
> My question is, if I fail on otherData, I want to "rollback" the data from
> the column vectors at the current vectorPosition I'm iterating on. Is it as
> simple as setting colVector.isNull[vectorPosition] to true and setting
> colVector.noNulls to false? I wanted to originally go into the index for
> each column vector and override, but I don't see an easy way to do that.
>
> Cheers!!
> Ryan Schachte
>