You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/22 01:25:47 UTC

[GitHub] [flink] meetjunsu opened a new pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

meetjunsu opened a new pull request #17542:
URL: https://github.com/apache/flink/pull/17542


   ## What is the purpose of the change
   
   In order to support complex types, three new writers will be added to ParquetRowDataWriter, namely ArrayWriter, MapWriter, RowWriter.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (docs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-956078481






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-983257963


   Sorry for the late response, I will take a review in the next few days, hope you are still active.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761667459



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +277,153 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96(arrayData.getTimestamp(ordinal, precision)));
+        }
+    }
+
+    /** It writes a map field to parquet, both key and value are nullable. */
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    recordConsumer.startGroup();
+                    // write key element
+                    recordConsumer.startField(keyName, 0);
+                    keyWriter.write(keyArray, i);
+                    recordConsumer.endField(keyName, 0);
+                    // write value element
+                    recordConsumer.startField(valueName, 1);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-986510346


   I create https://issues.apache.org/jira/browse/FLINK-25179 for documentation, comment it if you want to take it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-986410927


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761699867



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -158,6 +190,65 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
         Assert.assertEquals(number, cnt);
     }
 
+    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
+        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+        int number = 1000;
+        List<Row> rows = new ArrayList<>(number);
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("k1", "v1");
+        mapData.put(null, "v2");
+        mapData.put("k2", null);
+
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            rows.add(Row.of(new Integer[] {v}, mapData, Row.of(String.valueOf(v), v)));
+        }
+
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
+        BulkWriter<RowData> writer =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        for (int i = 0; i < number; i++) {
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
+        }
+        writer.flush();
+        writer.finish();
+
+        File file = new File(path.getPath());
+        final List<Row> fileContent = readParquetFile(file);
+        assertEquals(rows, fileContent);
+    }
+
+    private static List<Row> readParquetFile(File file) throws IOException {
+        InputFile inFile =
+                HadoopInputFile.fromPath(
+                        new org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+
+        ArrayList<Row> results = new ArrayList<>();
+        try (ParquetReader<GenericRecord> reader =
+                AvroParquetReader.<GenericRecord>builder(inFile).build()) {
+            GenericRecord next;
+            while ((next = reader.read()) != null) {
+                Integer c0 = (Integer) ((ArrayList<GenericData.Record>) next.get(0)).get(0).get(0);
+                HashMap<Utf8, Utf8> map = ((HashMap<Utf8, Utf8>) next.get(1));
+                String c21 = ((GenericData.Record) next.get(2)).get(0).toString();
+                Integer c22 = (Integer) ((GenericData.Record) next.get(2)).get(1);
+
+                Map<String, String> c1 = new HashMap<>();
+                for (Utf8 key : map.keySet()) {
+                    String k = Strings.isEmpty(key) ? null : key.toString();

Review comment:
       I change `Strings.isEmpty(key)` to `key == null`, the tests passed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949172629


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 9b11c30444ab1519ade5b7572a36d426739fc672 (Fri Oct 22 01:29:38 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r735575324



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -126,13 +134,27 @@ private FieldWriter createWriter(LogicalType t, Type type) {
                     throw new UnsupportedOperationException("Unsupported type: " + type);
             }
         } else {
-            throw new IllegalArgumentException("Unsupported  data type: " + t);
+            GroupType groupType = type.asGroupType();
+            LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+
+            if (t instanceof ArrayType
+                    && logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+                return new ArrayWriter(((ArrayType) t).getElementType(), groupType);
+            } else if (t instanceof MapType
+                    && logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+                return new MapWriter(
+                        ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType);
+            } else {
+                return new RowWriter(t, groupType);

Review comment:
       As far as I know there is also MultisetType, should it be treated here as Row?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9b11c30444ab1519ade5b7572a36d426739fc672 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r735578442



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       Could value be `null` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * d3fc88995b30a57e2572047cfccae3ab61bfc9e5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-957024848






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-956078481


   @snuyanzin Could this PR be merged? The cornor test case I will add in another PR about reads composite types, because the code has dependencies,I will submit it soon, and you can review it at that time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * d3fc88995b30a57e2572047cfccae3ab61bfc9e5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760727340



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private String elementName;
+        private FieldWriter elementWriter;
+        private String repeatedGroupName;
+        private ArrayData.ElementGetter elementGetter;
+
+        private ArrayWriter(LogicalType t, GroupType groupType) {
+
+            // Get the internal array structure
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            Type elementType = repeatedType.getType(0);
+            this.elementName = elementType.getName();
+
+            this.elementWriter = createWriter(t, elementType);
+            this.elementGetter = ArrayData.createElementGetter(t);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+            ArrayData arrayData = row.getArray(ordinal);
+            int listLength = arrayData.size();
+
+            if (listLength > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                for (int i = 0; i < listLength; i++) {
+                    Object object = elementGetter.getElementOrNull(arrayData, i);
+                    recordConsumer.startGroup();
+                    if (object != null) {
+                        recordConsumer.startField(elementName, 0);
+                        elementWriter.write(object);
+                        recordConsumer.endField(elementName, 0);
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class GroupTypeWriter implements FieldWriter {

Review comment:
       Why separate a class `GroupTypeWriter`? Merge this with `RowWriter`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 20b5850a68b1463d36796ccbfb8ea3eae5bf1221 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426) 
   * d0d2a946486ca5c1c021b3fb81b27870ca04ec29 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761651214



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +277,153 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96(arrayData.getTimestamp(ordinal, precision)));
+        }
+    }
+
+    /** It writes a map field to parquet, both key and value are nullable. */
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    recordConsumer.startGroup();
+                    // write key element
+                    recordConsumer.startField(keyName, 0);

Review comment:
       It is dangerous to `getXX` without `isNullAt` returned false. Maybe it will produce NPE, maybe it will have an unexpected effect.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421) 
   * d3fc88995b30a57e2572047cfccae3ab61bfc9e5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737202866



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       If the value is null, the isNullAt method has been called before, so there is no need to write a null value here, Flink does not support read composite type now , I will add test cases later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d0d2a946486ca5c1c021b3fb81b27870ca04ec29 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760713947



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -126,13 +134,29 @@ private FieldWriter createWriter(LogicalType t, Type type) {
                     throw new UnsupportedOperationException("Unsupported type: " + type);
             }
         } else {
-            throw new IllegalArgumentException("Unsupported  data type: " + t);
+            GroupType groupType = type.asGroupType();
+            LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+
+            if (t instanceof ArrayType
+                    && logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+                return new ArrayWriter(((ArrayType) t).getElementType(), groupType);
+            } else if (t instanceof MapType
+                    && logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+                return new MapWriter(
+                        ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType);
+            } else if (t instanceof RowType && type instanceof GroupType) {
+                return new RowWriter(t, groupType);
+            } else {
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+            }
         }
     }
 
     private interface FieldWriter {
 
         void write(RowData row, int ordinal);
+
+        void write(Object value);

Review comment:
       Can we add a method `write(ArrayData row, int ordinal)` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc6d9ef126185b79ec6edc6d4feedf0df95564f6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524) 
   * 45f8c8bfb9c3ff5fad9d66457ceed4a1551af392 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "986374810",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * fc6d9ef126185b79ec6edc6d4feedf0df95564f6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524) 
   * 45f8c8bfb9c3ff5fad9d66457ceed4a1551af392 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "986374810",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573",
       "triggerID" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 1b1e007b3b10145f38249f664a6b69f70da12206 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9b11c30444ab1519ade5b7572a36d426739fc672 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344) 
   * 6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-986507622


   @meetjunsu Can you create PR for modifying this page: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/parquet/#data-type-mapping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421) 
   * d3fc88995b30a57e2572047cfccae3ab61bfc9e5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-957121466


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 0a653d3725ef573cc14bef5e06245e1c85dd662e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760714379



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -189,6 +243,11 @@ public void write(RowData row, int ordinal) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));

Review comment:
       `write(row.getString(ordinal))`? Others same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760997452



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -126,13 +134,29 @@ private FieldWriter createWriter(LogicalType t, Type type) {
                     throw new UnsupportedOperationException("Unsupported type: " + type);
             }
         } else {
-            throw new IllegalArgumentException("Unsupported  data type: " + t);
+            GroupType groupType = type.asGroupType();
+            LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+
+            if (t instanceof ArrayType
+                    && logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+                return new ArrayWriter(((ArrayType) t).getElementType(), groupType);
+            } else if (t instanceof MapType
+                    && logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+                return new MapWriter(
+                        ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType);
+            } else if (t instanceof RowType && type instanceof GroupType) {
+                return new RowWriter(t, groupType);
+            } else {
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+            }
         }
     }
 
     private interface FieldWriter {
 
         void write(RowData row, int ordinal);
+
+        void write(Object value);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761000688



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
##########
@@ -101,11 +110,32 @@ private static Type convertToParquetType(
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
                         .named(name);
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) type;
+                return ConversionPatterns.listOfElements(
+                        repetition,
+                        name,
+                        convertToParquetType(LIST_ELEMENT_NAME, arrayType.getElementType()));
+            case MAP:
+                MapType mapType = (MapType) type;
+                return ConversionPatterns.stringKeyMapType(

Review comment:
       supports multiple types in latest commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761612653



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -165,6 +187,11 @@ public void write(RowData row, int ordinal) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addLong(row.getLong(ordinal));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {

Review comment:
       It is better to add a method:
   ```
   private void writeLong(long value) {
      recordConsumer.addLong(value);
   }
   ```
   Then these two method can reuse logical.

##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -158,6 +190,65 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
         Assert.assertEquals(number, cnt);
     }
 
+    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
+        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+        int number = 1000;
+        List<Row> rows = new ArrayList<>(number);
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("k1", "v1");
+        mapData.put(null, "v2");
+        mapData.put("k2", null);
+
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            rows.add(Row.of(new Integer[] {v}, mapData, Row.of(String.valueOf(v), v)));
+        }
+
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
+        BulkWriter<RowData> writer =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        for (int i = 0; i < number; i++) {
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
+        }
+        writer.flush();
+        writer.finish();
+
+        File file = new File(path.getPath());
+        final List<Row> fileContent = readParquetFile(file);
+        assertEquals(rows, fileContent);
+    }
+
+    private static List<Row> readParquetFile(File file) throws IOException {
+        InputFile inFile =
+                HadoopInputFile.fromPath(
+                        new org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+
+        ArrayList<Row> results = new ArrayList<>();
+        try (ParquetReader<GenericRecord> reader =
+                AvroParquetReader.<GenericRecord>builder(inFile).build()) {
+            GenericRecord next;
+            while ((next = reader.read()) != null) {
+                Integer c0 = (Integer) ((ArrayList<GenericData.Record>) next.get(0)).get(0).get(0);
+                HashMap<Utf8, Utf8> map = ((HashMap<Utf8, Utf8>) next.get(1));
+                String c21 = ((GenericData.Record) next.get(2)).get(0).toString();
+                Integer c22 = (Integer) ((GenericData.Record) next.get(2)).get(1);
+
+                Map<String, String> c1 = new HashMap<>();
+                for (Utf8 key : map.keySet()) {
+                    String k = Strings.isEmpty(key) ? null : key.toString();

Review comment:
       write null return empty string? is this expected?

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +277,153 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96(arrayData.getTimestamp(ordinal, precision)));
+        }
+    }
+
+    /** It writes a map field to parquet, both key and value are nullable. */
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    recordConsumer.startGroup();
+                    // write key element
+                    recordConsumer.startField(keyName, 0);
+                    keyWriter.write(keyArray, i);
+                    recordConsumer.endField(keyName, 0);
+                    // write value element
+                    recordConsumer.startField(valueName, 1);

Review comment:
       Maybe there should be `if (!valueArray.isNullAt(i))`?
   
   

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +277,153 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96(arrayData.getTimestamp(ordinal, precision)));
+        }
+    }
+
+    /** It writes a map field to parquet, both key and value are nullable. */
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    recordConsumer.startGroup();
+                    // write key element
+                    recordConsumer.startField(keyName, 0);

Review comment:
       Maybe there should be `if (!keyArray.isNullAt(i))`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d0d2a946486ca5c1c021b3fb81b27870ca04ec29 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491) 
   * fc6d9ef126185b79ec6edc6d4feedf0df95564f6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-953512858


   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-952456458


   @snuyanzin updated in latest commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737241851



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       Flink does not support read composite type now, could i  add test cases later ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9b11c30444ab1519ade5b7572a36d426739fc672 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344) 
   * 6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-956078481






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu removed a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu removed a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "986374810",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573",
       "triggerID" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 45f8c8bfb9c3ff5fad9d66457ceed4a1551af392 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570) 
   * 1b1e007b3b10145f38249f664a6b69f70da12206 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "986374810",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573",
       "triggerID" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 1b1e007b3b10145f38249f664a6b69f70da12206 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27573) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17542:
URL: https://github.com/apache/flink/pull/17542


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761700468



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -158,6 +190,65 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
         Assert.assertEquals(number, cnt);
     }
 
+    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
+        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+        int number = 1000;
+        List<Row> rows = new ArrayList<>(number);
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("k1", "v1");
+        mapData.put(null, "v2");
+        mapData.put("k2", null);
+
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            rows.add(Row.of(new Integer[] {v}, mapData, Row.of(String.valueOf(v), v)));
+        }
+
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
+        BulkWriter<RowData> writer =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        for (int i = 0; i < number; i++) {
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
+        }
+        writer.flush();
+        writer.finish();
+
+        File file = new File(path.getPath());
+        final List<Row> fileContent = readParquetFile(file);
+        assertEquals(rows, fileContent);
+    }
+
+    private static List<Row> readParquetFile(File file) throws IOException {
+        InputFile inFile =
+                HadoopInputFile.fromPath(
+                        new org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+
+        ArrayList<Row> results = new ArrayList<>();
+        try (ParquetReader<GenericRecord> reader =
+                AvroParquetReader.<GenericRecord>builder(inFile).build()) {
+            GenericRecord next;
+            while ((next = reader.read()) != null) {
+                Integer c0 = (Integer) ((ArrayList<GenericData.Record>) next.get(0)).get(0).get(0);
+                HashMap<Utf8, Utf8> map = ((HashMap<Utf8, Utf8>) next.get(1));
+                String c21 = ((GenericData.Record) next.get(2)).get(0).toString();
+                Integer c22 = (Integer) ((GenericData.Record) next.get(2)).get(1);
+
+                Map<String, String> c1 = new HashMap<>();
+                for (Utf8 key : map.keySet()) {
+                    String k = Strings.isEmpty(key) ? null : key.toString();

Review comment:
       > avro GenericRecord will auto convert null to empty string.
   
   Do you have a reference stating this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a653d3725ef573cc14bef5e06245e1c85dd662e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865) 
   * 20b5850a68b1463d36796ccbfb8ea3eae5bf1221 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760724189



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {

Review comment:
       Add some document?
   `a 3-level list structure annotated with LIST with elements...`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760727608



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private String elementName;
+        private FieldWriter elementWriter;
+        private String repeatedGroupName;
+        private ArrayData.ElementGetter elementGetter;
+
+        private ArrayWriter(LogicalType t, GroupType groupType) {
+
+            // Get the internal array structure
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            Type elementType = repeatedType.getType(0);
+            this.elementName = elementType.getName();
+
+            this.elementWriter = createWriter(t, elementType);
+            this.elementGetter = ArrayData.createElementGetter(t);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+            ArrayData arrayData = row.getArray(ordinal);
+            int listLength = arrayData.size();
+
+            if (listLength > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                for (int i = 0; i < listLength; i++) {
+                    Object object = elementGetter.getElementOrNull(arrayData, i);
+                    recordConsumer.startGroup();
+                    if (object != null) {
+                        recordConsumer.startField(elementName, 0);
+                        elementWriter.write(object);
+                        recordConsumer.endField(elementName, 0);
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class GroupTypeWriter implements FieldWriter {

Review comment:
       Can `ParquetRowDataWriter.write` reuse this class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761007129



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761004060



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -189,6 +243,11 @@ public void write(RowData row, int ordinal) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));

Review comment:
       We don't have a `write(Object value)` method now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761628631



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -158,6 +190,65 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
         Assert.assertEquals(number, cnt);
     }
 
+    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
+        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+        int number = 1000;
+        List<Row> rows = new ArrayList<>(number);
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("k1", "v1");
+        mapData.put(null, "v2");
+        mapData.put("k2", null);
+
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            rows.add(Row.of(new Integer[] {v}, mapData, Row.of(String.valueOf(v), v)));
+        }
+
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
+        BulkWriter<RowData> writer =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        for (int i = 0; i < number; i++) {
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
+        }
+        writer.flush();
+        writer.finish();
+
+        File file = new File(path.getPath());
+        final List<Row> fileContent = readParquetFile(file);
+        assertEquals(rows, fileContent);
+    }
+
+    private static List<Row> readParquetFile(File file) throws IOException {
+        InputFile inFile =
+                HadoopInputFile.fromPath(
+                        new org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+
+        ArrayList<Row> results = new ArrayList<>();
+        try (ParquetReader<GenericRecord> reader =
+                AvroParquetReader.<GenericRecord>builder(inFile).build()) {
+            GenericRecord next;
+            while ((next = reader.read()) != null) {
+                Integer c0 = (Integer) ((ArrayList<GenericData.Record>) next.get(0)).get(0).get(0);
+                HashMap<Utf8, Utf8> map = ((HashMap<Utf8, Utf8>) next.get(1));
+                String c21 = ((GenericData.Record) next.get(2)).get(0).toString();
+                Integer c22 = (Integer) ((GenericData.Record) next.get(2)).get(1);
+
+                Map<String, String> c1 = new HashMap<>();
+                for (Utf8 key : map.keySet()) {
+                    String k = Strings.isEmpty(key) ? null : key.toString();

Review comment:
       key is nullable, avro GenericRecord will auto convert null to empty string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949188527


   @JingsongLi  could you help to review this PR ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 0a653d3725ef573cc14bef5e06245e1c85dd662e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-960418603


   rebase onto latest master branch to fix python tests error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737202866



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       If the value is `null`, the `isNullAt` method has been called before, so there is no need to write a null value here, Flink does not support read composite type now , I will add test cases later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 0a653d3725ef573cc14bef5e06245e1c85dd662e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9b11c30444ab1519ade5b7572a36d426739fc672 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9b11c30444ab1519ade5b7572a36d426739fc672 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760999986



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private String elementName;
+        private FieldWriter elementWriter;
+        private String repeatedGroupName;
+        private ArrayData.ElementGetter elementGetter;
+
+        private ArrayWriter(LogicalType t, GroupType groupType) {
+
+            // Get the internal array structure
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            Type elementType = repeatedType.getType(0);
+            this.elementName = elementType.getName();
+
+            this.elementWriter = createWriter(t, elementType);
+            this.elementGetter = ArrayData.createElementGetter(t);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+            ArrayData arrayData = row.getArray(ordinal);
+            int listLength = arrayData.size();
+
+            if (listLength > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                for (int i = 0; i < listLength; i++) {
+                    Object object = elementGetter.getElementOrNull(arrayData, i);
+                    recordConsumer.startGroup();
+                    if (object != null) {
+                        recordConsumer.startField(elementName, 0);
+                        elementWriter.write(object);
+                        recordConsumer.endField(elementName, 0);
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class GroupTypeWriter implements FieldWriter {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737239171



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +291,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private String elementName;
+        private FieldWriter elementWriter;
+        private String repeatedGroupName;
+        private ArrayData.ElementGetter elementGetter;
+
+        private ArrayWriter(LogicalType t, GroupType groupType) {
+
+            // Get the internal array structure
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            Type elementType = repeatedType.getType(0);
+            this.elementName = elementType.getName();
+
+            this.elementWriter = createWriter(t, elementType);
+            this.elementGetter = ArrayData.createElementGetter(t);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+            ArrayData arrayData = row.getArray(ordinal);
+            int listLength = arrayData.size();
+
+            if (listLength > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                for (int i = 0; i < listLength; i++) {
+                    Object object = elementGetter.getElementOrNull(arrayData, i);
+                    recordConsumer.startGroup();
+                    if (object != null) {
+                        recordConsumer.startField(elementName, 0);
+                        elementWriter.write(object);
+                        recordConsumer.endField(elementName, 0);
+                    }
+                    recordConsumer.endGroup();

Review comment:
       if object is `null`, `writeNullForMissingFieldsAtCurrentLevel()` will call in `endGroup()` method, So we can directly filter out the writes of null in the previous step




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760725040



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
##########
@@ -101,11 +110,32 @@ private static Type convertToParquetType(
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
                         .named(name);
+            case ARRAY:
+                ArrayType arrayType = (ArrayType) type;
+                return ConversionPatterns.listOfElements(
+                        repetition,
+                        name,
+                        convertToParquetType(LIST_ELEMENT_NAME, arrayType.getElementType()));
+            case MAP:
+                MapType mapType = (MapType) type;
+                return ConversionPatterns.stringKeyMapType(

Review comment:
       must be string key?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc6d9ef126185b79ec6edc6d4feedf0df95564f6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524) 
   * 45f8c8bfb9c3ff5fad9d66457ceed4a1551af392 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761667191



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -165,6 +187,11 @@ public void write(RowData row, int ordinal) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addLong(row.getLong(ordinal));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 20b5850a68b1463d36796ccbfb8ea3eae5bf1221 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426) 
   * d0d2a946486ca5c1c021b3fb81b27870ca04ec29 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3fc88995b30a57e2572047cfccae3ab61bfc9e5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-957024848


   @JingsongLi  Would you like to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737187615



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       May be I missed it however could we in this case have a test checking such corner cases?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737254590



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       I think so, however I would log this issue in jira (in case it is not yet logged).
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737202866



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       If the value is `null`, the `isNullAt` method has been called before, so there is no need to write a null value here, Flink does not support read composite type now , I will add test cases later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737235601



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       If the value is `null`, although the `write()` method is skipped here, but RecordConsumer will automatically write a null value in the `endGroup()` method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-986374810


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760999864



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -120,15 +144,18 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
                             toDateTime(v),
                             BigDecimal.valueOf(v),
                             BigDecimal.valueOf(v),
-                            BigDecimal.valueOf(v)));
+                            BigDecimal.valueOf(v),
+                            new Integer[] {v},
+                            mapData,
+                            Row.of(String.valueOf(v), v)));
         }
 
         ParquetWriterFactory<RowData> factory =
-                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE, conf, utcTimestamp);
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
         BulkWriter<RowData> writer =
                 factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
         for (int i = 0; i < number; i++) {
-            writer.addElement(CONVERTER.toInternal(rows.get(i)));
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
         }
         writer.flush();
         writer.finish();

Review comment:
       add test function `complexTypeTest`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761629258



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +277,153 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(ArrayData arrayData, int ordinal) {
+            recordConsumer.addBinary(timestampToInt96(arrayData.getTimestamp(ordinal, precision)));
+        }
+    }
+
+    /** It writes a map field to parquet, both key and value are nullable. */
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    recordConsumer.startGroup();
+                    // write key element
+                    recordConsumer.startField(keyName, 0);

Review comment:
       update as you say, but the current version does not affect the results




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-984249951


   @meetjunsu Thanks for the contribution. Left some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737205891



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +291,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private String elementName;
+        private FieldWriter elementWriter;
+        private String repeatedGroupName;
+        private ArrayData.ElementGetter elementGetter;
+
+        private ArrayWriter(LogicalType t, GroupType groupType) {
+
+            // Get the internal array structure
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            Type elementType = repeatedType.getType(0);
+            this.elementName = elementType.getName();
+
+            this.elementWriter = createWriter(t, elementType);
+            this.elementGetter = ArrayData.createElementGetter(t);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+            ArrayData arrayData = row.getArray(ordinal);
+            int listLength = arrayData.size();
+
+            if (listLength > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                for (int i = 0; i < listLength; i++) {
+                    Object object = elementGetter.getElementOrNull(arrayData, i);

Review comment:
       The `isNullAt` method has been called here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737206308



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +291,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);

Review comment:
       The `isNullAt` method has been called here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760725974



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {

Review comment:
       Key is nullable?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a653d3725ef573cc14bef5e06245e1c85dd662e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865) 
   * 20b5850a68b1463d36796ccbfb8ea3eae5bf1221 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760999104



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {
+                        // write key element
+                        recordConsumer.startField(keyName, 0);
+                        keyWriter.write(key);
+                        recordConsumer.endField(keyName, 0);
+
+                        // write value element
+                        if (value != null) {
+                            recordConsumer.startField(valueName, 1);
+                            valueWriter.write(value);
+                            recordConsumer.endField(valueName, 1);
+                        }
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class ArrayWriter implements FieldWriter {
+
+        private String elementName;
+        private FieldWriter elementWriter;
+        private String repeatedGroupName;
+        private ArrayData.ElementGetter elementGetter;
+
+        private ArrayWriter(LogicalType t, GroupType groupType) {
+
+            // Get the internal array structure
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            Type elementType = repeatedType.getType(0);
+            this.elementName = elementType.getName();
+
+            this.elementWriter = createWriter(t, elementType);
+            this.elementGetter = ArrayData.createElementGetter(t);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+            ArrayData arrayData = row.getArray(ordinal);
+            int listLength = arrayData.size();
+
+            if (listLength > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                for (int i = 0; i < listLength; i++) {
+                    Object object = elementGetter.getElementOrNull(arrayData, i);
+                    recordConsumer.startGroup();
+                    if (object != null) {
+                        recordConsumer.startField(elementName, 0);
+                        elementWriter.write(object);
+                        recordConsumer.endField(elementName, 0);
+                    }
+                    recordConsumer.endGroup();
+                }
+
+                recordConsumer.endField(repeatedGroupName, 0);
+            }
+            recordConsumer.endGroup();
+        }
+
+        @Override
+        public void write(Object value) {}
+    }
+
+    private class GroupTypeWriter implements FieldWriter {

Review comment:
       merged




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761000484



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {

Review comment:
       supports multiple types in latest commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761831780



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -158,6 +190,65 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
         Assert.assertEquals(number, cnt);
     }
 
+    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
+        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+        int number = 1000;
+        List<Row> rows = new ArrayList<>(number);
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("k1", "v1");
+        mapData.put(null, "v2");
+        mapData.put("k2", null);
+
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            rows.add(Row.of(new Integer[] {v}, mapData, Row.of(String.valueOf(v), v)));
+        }
+
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
+        BulkWriter<RowData> writer =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        for (int i = 0; i < number; i++) {
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
+        }
+        writer.flush();
+        writer.finish();
+
+        File file = new File(path.getPath());
+        final List<Row> fileContent = readParquetFile(file);
+        assertEquals(rows, fileContent);
+    }
+
+    private static List<Row> readParquetFile(File file) throws IOException {
+        InputFile inFile =
+                HadoopInputFile.fromPath(
+                        new org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+
+        ArrayList<Row> results = new ArrayList<>();
+        try (ParquetReader<GenericRecord> reader =
+                AvroParquetReader.<GenericRecord>builder(inFile).build()) {
+            GenericRecord next;
+            while ((next = reader.read()) != null) {
+                Integer c0 = (Integer) ((ArrayList<GenericData.Record>) next.get(0)).get(0).get(0);
+                HashMap<Utf8, Utf8> map = ((HashMap<Utf8, Utf8>) next.get(1));
+                String c21 = ((GenericData.Record) next.get(2)).get(0).toString();
+                Integer c22 = (Integer) ((GenericData.Record) next.get(2)).get(1);
+
+                Map<String, String> c1 = new HashMap<>();
+                for (Utf8 key : map.keySet()) {
+                    String k = Strings.isEmpty(key) ? null : key.toString();

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d0d2a946486ca5c1c021b3fb81b27870ca04ec29 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491) 
   * fc6d9ef126185b79ec6edc6d4feedf0df95564f6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc6d9ef126185b79ec6edc6d4feedf0df95564f6 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * d3fc88995b30a57e2572047cfccae3ab61bfc9e5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-957121466


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r735603570



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -126,13 +134,27 @@ private FieldWriter createWriter(LogicalType t, Type type) {
                     throw new UnsupportedOperationException("Unsupported type: " + type);
             }
         } else {
-            throw new IllegalArgumentException("Unsupported  data type: " + t);
+            GroupType groupType = type.asGroupType();
+            LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+
+            if (t instanceof ArrayType
+                    && logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+                return new ArrayWriter(((ArrayType) t).getElementType(), groupType);
+            } else if (t instanceof MapType
+                    && logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
+                return new MapWriter(
+                        ((MapType) t).getKeyType(), ((MapType) t).getValueType(), groupType);
+            } else {
+                return new RowWriter(t, groupType);

Review comment:
       Sorry, I will throw a UnsupportedOperationException here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-957024848






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-958768513


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r735603151



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       `null` will be filtered in MapWriter and ArrayWriter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-956078481


   @snuyanzin Could this PR be merged? The cornor test case I will add in another PR about reads composite types, because the code has dependencies,I will submit it soon, and you can review it at that time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761001017



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -224,6 +293,176 @@ private TimestampWriter(int precision) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision)));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBinary(timestampToInt96((TimestampData) value));
+        }
+    }
+
+    private class MapWriter implements FieldWriter {
+
+        private String repeatedGroupName;
+        private String keyName, valueName;
+        private FieldWriter keyWriter, valueWriter;
+        private ArrayData.ElementGetter keyElementGetter, valueElementGetter;
+
+        private MapWriter(LogicalType keyType, LogicalType valueType, GroupType groupType) {
+
+            // Get the internal map structure (MAP_KEY_VALUE)
+            GroupType repeatedType = groupType.getType(0).asGroupType();
+            this.repeatedGroupName = repeatedType.getName();
+
+            // Get key element information
+            Type type = repeatedType.getType(0);
+            this.keyName = type.getName();
+            this.keyWriter = createWriter(keyType, type);
+
+            // Get value element information
+            Type valuetype = repeatedType.getType(1);
+            this.valueName = valuetype.getName();
+            this.valueWriter = createWriter(valueType, valuetype);
+
+            this.keyElementGetter = ArrayData.createElementGetter(keyType);
+            this.valueElementGetter = ArrayData.createElementGetter(valueType);
+        }
+
+        @Override
+        public void write(RowData row, int ordinal) {
+            recordConsumer.startGroup();
+
+            MapData mapData = row.getMap(ordinal);
+
+            if (mapData != null && mapData.size() > 0) {
+                recordConsumer.startField(repeatedGroupName, 0);
+
+                ArrayData keyArray = mapData.keyArray();
+                ArrayData valueArray = mapData.valueArray();
+                for (int i = 0; i < keyArray.size(); i++) {
+                    Object key = keyElementGetter.getElementOrNull(keyArray, i);
+                    Object value = valueElementGetter.getElementOrNull(valueArray, i);
+
+                    recordConsumer.startGroup();
+                    if (key != null) {

Review comment:
       supports null in latest commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27491",
       "triggerID" : "d0d2a946486ca5c1c021b3fb81b27870ca04ec29",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27524",
       "triggerID" : "fc6d9ef126185b79ec6edc6d4feedf0df95564f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45f8c8bfb9c3ff5fad9d66457ceed4a1551af392",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570",
       "triggerID" : "986374810",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "986410927",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1b1e007b3b10145f38249f664a6b69f70da12206",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 45f8c8bfb9c3ff5fad9d66457ceed4a1551af392 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27570) 
   * 1b1e007b3b10145f38249f664a6b69f70da12206 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r761829487



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -158,6 +190,65 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
         Assert.assertEquals(number, cnt);
     }
 
+    public void complexTypeTest(Configuration conf, boolean utcTimestamp) throws Exception {
+        Path path = new Path(TEMPORARY_FOLDER.newFolder().getPath(), UUID.randomUUID().toString());
+        int number = 1000;
+        List<Row> rows = new ArrayList<>(number);
+        Map<String, String> mapData = new HashMap<>();
+        mapData.put("k1", "v1");
+        mapData.put(null, "v2");
+        mapData.put("k2", null);
+
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            rows.add(Row.of(new Integer[] {v}, mapData, Row.of(String.valueOf(v), v)));
+        }
+
+        ParquetWriterFactory<RowData> factory =
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
+        BulkWriter<RowData> writer =
+                factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
+        for (int i = 0; i < number; i++) {
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
+        }
+        writer.flush();
+        writer.finish();
+
+        File file = new File(path.getPath());
+        final List<Row> fileContent = readParquetFile(file);
+        assertEquals(rows, fileContent);
+    }
+
+    private static List<Row> readParquetFile(File file) throws IOException {
+        InputFile inFile =
+                HadoopInputFile.fromPath(
+                        new org.apache.hadoop.fs.Path(file.toURI()), new Configuration());
+
+        ArrayList<Row> results = new ArrayList<>();
+        try (ParquetReader<GenericRecord> reader =
+                AvroParquetReader.<GenericRecord>builder(inFile).build()) {
+            GenericRecord next;
+            while ((next = reader.read()) != null) {
+                Integer c0 = (Integer) ((ArrayList<GenericData.Record>) next.get(0)).get(0).get(0);
+                HashMap<Utf8, Utf8> map = ((HashMap<Utf8, Utf8>) next.get(1));
+                String c21 = ((GenericData.Record) next.get(2)).get(0).toString();
+                Integer c22 = (Integer) ((GenericData.Record) next.get(2)).get(1);
+
+                Map<String, String> c1 = new HashMap<>();
+                for (Utf8 key : map.keySet()) {
+                    String k = Strings.isEmpty(key) ? null : key.toString();

Review comment:
       Oh? Then I may have been wrong about yesterday




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r760729158



##########
File path: flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
##########
@@ -120,15 +144,18 @@ private void innerTest(Configuration conf, boolean utcTimestamp) throws IOExcept
                             toDateTime(v),
                             BigDecimal.valueOf(v),
                             BigDecimal.valueOf(v),
-                            BigDecimal.valueOf(v)));
+                            BigDecimal.valueOf(v),
+                            new Integer[] {v},
+                            mapData,
+                            Row.of(String.valueOf(v), v)));
         }
 
         ParquetWriterFactory<RowData> factory =
-                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE, conf, utcTimestamp);
+                ParquetRowDataBuilder.createWriterFactory(ROW_TYPE_COMPLEX, conf, utcTimestamp);
         BulkWriter<RowData> writer =
                 factory.create(path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE));
         for (int i = 0; i < number; i++) {
-            writer.addElement(CONVERTER.toInternal(rows.get(i)));
+            writer.addElement(CONVERTER_COMPLEX.toInternal(rows.get(i)));
         }
         writer.flush();
         writer.finish();

Review comment:
       I think you need add assert below. No validation for written data.
   Maybe you can use something like `ParquetAvroStreamingFileSinkITCase.readParquetFile`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17542:
URL: https://github.com/apache/flink/pull/17542#issuecomment-949170131


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25344",
       "triggerID" : "9b11c30444ab1519ade5b7572a36d426739fc672",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25421",
       "triggerID" : "6d75c9ad1b7ec1ef5121b0d2f348f2a6b966eb73",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3fc88995b30a57e2572047cfccae3ab61bfc9e5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25698",
       "triggerID" : "957121466",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25865",
       "triggerID" : "0a653d3725ef573cc14bef5e06245e1c85dd662e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "958768513",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426",
       "triggerID" : "20b5850a68b1463d36796ccbfb8ea3eae5bf1221",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 20b5850a68b1463d36796ccbfb8ea3eae5bf1221 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=27426) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737956485



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       This issue has been logged. https://issues.apache.org/jira/browse/FLINK-24614




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] meetjunsu commented on a change in pull request #17542: [FLINK-17782] Add array,map,row types support for parquet row writer

Posted by GitBox <gi...@apache.org>.
meetjunsu commented on a change in pull request #17542:
URL: https://github.com/apache/flink/pull/17542#discussion_r737293619



##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
##########
@@ -141,6 +163,11 @@ private FieldWriter createWriter(LogicalType t, Type type) {
         public void write(RowData row, int ordinal) {
             recordConsumer.addBoolean(row.getBoolean(ordinal));
         }
+
+        @Override
+        public void write(Object value) {
+            recordConsumer.addBoolean((boolean) value);

Review comment:
       Must test cases be added before merge this PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org