You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sharipov, Rinat" <r....@cleverdata.ru> on 2021/08/05 15:07:53 UTC

write into parquet with variable number of columns

Hi mates !

I'm trying to find the best way to persist data into columnar format
(parquet) using Flink.
Each event contains a fixed list of properties and a variable list of
properties, defined by the user.
And I would  like to save user defined properties into separate columns on
the fly.

Here is an example of my events:

[
  {
    "eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
    "timestamp": 123,
    "attributes": {
      "gender": "male",
      "geo": "Germany"
    }
  },
  {
    "eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
    "timestamp": 123,
    "attributes": {
      "car": "big-cool-car",
      "phone": "iphone"
    }
  }
]

As a result, I would like to have a table with the following columns

*eventId | timestamp | gender | geo | car | phone*

I've looked into streaming file sink, but it requires defining a schema
before starting a job, which is not possible in my case.
Also I've remembered about *explode sql function* that can help me with a
standard sql, but it doesn't exist in the Flink Table API.

I have found that since *1.13.0 version Flink *supports creation of row by
names using *Row.withNames(), *so I guess this can be
a key that solves my problem, here is what java doc says

*Name-based field mode **withNames() creates a variable-length row. The
fields can be accessed by name using getField(String) and setField(String,
Object). Every field is initialized during the first call to
setField(String, Object) for the given name. However, the framework will
initialize missing fields with null and reorder all fields once more type
information is available during serialization or input conversion. Thus,
even name-based rows eventually become fixed-length composite types with a
deterministic field order. Name-based rows perform worse than
position-based rows but simplify row creation and code readability.*

So it seems that all I need is to transform my event into a record manually
and persist the resulting table into a file-system, but my noop demo
example fails within an exception, here it is:

public class TestApp {

  public static void main(String[] args) {

    StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    Row row1 = Row.withNames();

    row1.setField("a", "fb1");

    row1.setField("b", "gmail1");
    row1.setField("c", "vk1");

    Row row2 = Row.withNames();
    row2.setField("b", "ok2");
    row2.setField("c", "gmail2");

    tableEnv.fromValues(row1, row2).printSchema();

  }

}

Here is a stack trace of the exception:

*java.lang.IllegalArgumentException: Accessing a field by position is
not supported in name-based field mode.*

*	at org.apache.flink.types.Row.getField(Row.java:257)*

*	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)*

*	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)*

*	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)*

*	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)*

*	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)*

*	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)*

*	at org.apache.flink.table.expressions.ApiExpressionUtils.convertRow(ApiExpressionUtils.java:123)*

*	at org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:103)*

*	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)*

*	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)*

*	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)*

*	at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)*

*	at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)*

*	at org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:359)*

*	at org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:334)*


Maybe someone has tried this feature and can guess what's wrong with
the current code and how to make it work.

Anyway I have a fallback - accumulate a butch of events, define the
schema for them and write into file system manually, but I still hope
that I can do this in more elegant way.

Thx for your advice and time !


-- 
Best Regards,
*Sharipov Rinat*

CleverDATA
make your data clever