You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Priyanshu LNU <t-...@microsoft.com> on 2021/02/10 08:14:29 UTC

[java][apache-arrow] Having problem in appending record batches to same arrow file

Hi,
I am using Apache arrow in Java. I want to append stream of record batches to an already existing file.
That means, I have a fileOutputStream. Using ArrowStreamWriter, I wrote some batches to this fileOutputStream. Now I close the fileOutputStream and then again start this stream to append some batches to it.
The issue is that metadata gets appended again, so the streamreader is not able to read all the batches. So how Can I append stream of recordBatches to an existing file correctly without repetition of metadata in between?

Below is part of Java code I am currently using:

public void setupWrite(String filename, boolean useCustom) throws Exception {
        File arrowFile = validateFile(filename, false);
        this.fileOutputStream = new FileOutputStream(arrowFile);
        Schema schema = makeSchema();
        this.root = VectorSchemaRoot.create(schema, this.ra);
        DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
        this.arrowStreamWriter=new ArrowStreamWriter(root,provider,Channels.newChannel(this.fileOutputStream));

        // Just to show some stuff about the schema and layout
        System.out.println("Schema/Layout: ");
        for (Field field : root.getSchema().getFields()) {
            FieldVector vector = root.getVector(field.getName());
            showFieldLayout(field, vector);
        }
        System.out.println("Generated " + this.entries + " data entries , batch size " + batchSize + " usingCustomWriter: " + useCustom + " useNullValues " + this.useNullValues);

     // writing logic starts here
//      arrowFileWriter.start();
      arrowStreamWriter.start();

      for(int i = 0; i < this.entries;) {
          int toProcessItems = Math.min(this.batchSize, this.entries - i);
          // set the batch row count
          root.setRowCount(toProcessItems);
          for (Field field : root.getSchema().getFields()) {
              FieldVector vector = root.getVector(field.getName());
             // System.out.println(vector.getMinorType());
              switch (vector.getMinorType()) {
                  case INT:
                      writeFieldInt(vector, i, toProcessItems);
                      break;
                  case BIGINT:
                      writeFieldLong(vector, i, toProcessItems);
                      break;
                  case FLOAT4:
                      writeFieldFloat4(vector, i, toProcessItems);
                      break;
                  case VARCHAR:
                      writeFieldVarchar(vector, i, toProcessItems);
                      break;
                  case DATEDAY:
                      writeFieldDate(vector, i, toProcessItems);
                      break;
               // case VARBINARY:
                 //   writeFieldVarBinary(vector, i, toProcessItems);
                   // break;
                  default:
                      throw new Exception(" Not supported yet type: " + vector.getMinorType());
              }
          }
//          arrowFileWriter.writeBatch();
          arrowStreamWriter.writeBatch();
          i+=toProcessItems;
      }
      arrowStreamWriter.end();
      arrowStreamWriter.close();
      fileOutputStream.flush();
      fileOutputStream.close();






      this.fileOutputStream = new FileOutputStream(arrowFile,true);
      this.arrowStreamWriter=new ArrowStreamWriter(root,provider,this.fileOutputStream.getChannel());
      arrowStreamWriter.start();
      for(int i = 0; i < this.entries;) {
          int toProcessItems = Math.min(this.batchSize, this.entries - i);
          // set the batch row count
          root.setRowCount(toProcessItems);
          for (Field field : root.getSchema().getFields()) {
              FieldVector vector = root.getVector(field.getName());
             // System.out.println(vector.getMinorType());
              switch (vector.getMinorType()) {
                  case INT:
                      writeFieldInt(vector, i, toProcessItems);
                      break;
                  case BIGINT:
                      writeFieldLong(vector, i, toProcessItems);
                      break;
                  case FLOAT4:
                      writeFieldFloat4(vector, i, toProcessItems);
                      break;
                  case VARCHAR:
                      writeFieldVarchar(vector, i, toProcessItems);
                      break;
                  case DATEDAY:
                      writeFieldDate(vector, i, toProcessItems);
                      break;
               // case VARBINARY:
                 //   writeFieldVarBinary(vector, i, toProcessItems);
                   // break;
                  default:
                      throw new Exception(" Not supported yet type: " + vector.getMinorType());
              }
          arrowStreamWriter.writeBatch();
          i+=toProcessItems;
      }
      arrowStreamWriter.end();
      arrowStreamWriter.close();
      fileOutputStream.flush();
      fileOutputStream.close();
 }


Thanks,
Priyanshu


Re: [java][apache-arrow] Having problem in appending record batches to same arrow file

Posted by Micah Kornfield <em...@gmail.com>.
There isn't functionality like this that exists for appending data to a
file.  You could potentially open a file, read the footer back track to the
footer location and start writing again.  This wouldn't be atomic but cold
work (likely some lower level API would need to be exposed).

The other option is to write out an ArrowStream (so no random access) and
then start appending new record batches at the end (I would need to check
but it is possible there isn't an option to write a new schema to the same
file).

TL;DR; This would potentially require some refactoring of APIs.  I'm not
sure it is a good idea, what is the motivating use-case?

Thanks,
Micah

On Wed, Feb 10, 2021 at 12:14 AM Priyanshu LNU <t-...@microsoft.com> wrote:

> Hi,
>
> I am using Apache arrow in Java. I want to append stream of record batches
> to an already existing file.
>
> That means, I have a fileOutputStream. Using ArrowStreamWriter, I wrote
> some batches to this fileOutputStream. Now I close the fileOutputStream and
> then again start this stream to append some batches to it.
>
> The issue is that metadata gets appended again, so the streamreader is not
> able to read all the batches. So how Can I append stream of recordBatches
> to an existing file correctly without repetition of metadata in between?
>
>
>
> Below is part of Java code I am currently using:
>
>
>
> *public* *void* setupWrite(String filename, *boolean* useCustom) *throws*
> Exception {
>
>         File arrowFile = *validateFile*(filename, *false*);
>
>         *this*.fileOutputStream = *new* FileOutputStream(arrowFile);
>
>         Schema schema = makeSchema();
>
>         *this*.root = VectorSchemaRoot.*create*(schema, *this*.ra);
>
>         DictionaryProvider.MapDictionaryProvider provider = *new*
> DictionaryProvider.MapDictionaryProvider();
>
>         *this*.arrowStreamWriter=*new* ArrowStreamWriter(root,provider
> ,Channels.*newChannel*(*this*.fileOutputStream));
>
>
>
>         // Just to show some stuff about the schema and layout
>
>         System.*out*.println("Schema/Layout: ");
>
>         *for* (Field field : root.getSchema().getFields()) {
>
>             FieldVector vector = root.getVector(field.getName());
>
>             showFieldLayout(field, vector);
>
>         }
>
>         System.*out*.println("Generated " + *this*.entries + " data
> entries , batch size " + batchSize + " usingCustomWriter: " + useCustom + "
> useNullValues " + *this*.useNullValues);
>
>
>
>      // writing logic starts here
>
> //      arrowFileWriter.start();
>
>       arrowStreamWriter.start();
>
>
>
>       *for*(*int* i = 0; i < *this*.entries;) {
>
>           *int* toProcessItems = Math.*min*(*this*.batchSize, *this*.
> entries - i);
>
>           // set the batch row count
>
>           root.setRowCount(toProcessItems);
>
>           *for* (Field field : root.getSchema().getFields()) {
>
>               FieldVector vector = root.getVector(field.getName());
>
>              // System.out.println(vector.getMinorType());
>
>               *switch* (vector.getMinorType()) {
>
>                   *case* *INT*:
>
>                       writeFieldInt(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *BIGINT*:
>
>                       writeFieldLong(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *FLOAT4*:
>
>                       writeFieldFloat4(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *VARCHAR*:
>
>                       writeFieldVarchar(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *DATEDAY*:
>
>                       writeFieldDate(vector, i, toProcessItems);
>
>                       *break*;
>
>                // case VARBINARY:
>
>                  //   writeFieldVarBinary(vector, i, toProcessItems);
>
>                    // break;
>
>                   *default*:
>
>                       *throw* *new* Exception(" Not supported yet type: "
> + vector.getMinorType());
>
>               }
>
>           }
>
> //          arrowFileWriter.writeBatch();
>
>           arrowStreamWriter.writeBatch();
>
>           i+=toProcessItems;
>
>       }
>
>       arrowStreamWriter.end();
>
>       arrowStreamWriter.close();
>
>       fileOutputStream.flush();
>
>       fileOutputStream.close();
>
>
>
>
>
>
>
>
>
>
>
>
>
>       *this*.fileOutputStream = *new* FileOutputStream(arrowFile,*true*);
>
>       *this*.arrowStreamWriter=*new* ArrowStreamWriter(root,provider,
> *this*.fileOutputStream.getChannel());
>
>       arrowStreamWriter.start();
>
>       *for*(*int* i = 0; i < *this*.entries;) {
>
>           *int* toProcessItems = Math.*min*(*this*.batchSize, *this*.
> entries - i);
>
>           // set the batch row count
>
>           root.setRowCount(toProcessItems);
>
>           *for* (Field field : root.getSchema().getFields()) {
>
>               FieldVector vector = root.getVector(field.getName());
>
>              // System.out.println(vector.getMinorType());
>
>               *switch* (vector.getMinorType()) {
>
>                   *case* *INT*:
>
>                       writeFieldInt(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *BIGINT*:
>
>                       writeFieldLong(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *FLOAT4*:
>
>                       writeFieldFloat4(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *VARCHAR*:
>
>                       writeFieldVarchar(vector, i, toProcessItems);
>
>                       *break*;
>
>                   *case* *DATEDAY*:
>
>                       writeFieldDate(vector, i, toProcessItems);
>
>                       *break*;
>
>                // case VARBINARY:
>
>                  //   writeFieldVarBinary(vector, i, toProcessItems);
>
>                    // break;
>
>                   *default*:
>
>                       *throw* *new* Exception(" Not supported yet type: "
> + vector.getMinorType());
>
>               }
>
>           arrowStreamWriter.writeBatch();
>
>           i+=toProcessItems;
>
>       }
>
>       arrowStreamWriter.end();
>
>       arrowStreamWriter.close();
>
>       fileOutputStream.flush();
>
>       fileOutputStream.close();
>
>  }
>
>
>
>
>
> Thanks,
>
> Priyanshu
>
>
>