You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/19 22:16:24 UTC

[GitHub] [beam] talatuyarer opened a new pull request, #22348: Initial Commit for AvroPayloadSerializer

talatuyarer opened a new pull request, #22348:
URL: https://github.com/apache/beam/pull/22348

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22348:
URL: https://github.com/apache/beam/pull/22348#discussion_r925553392


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/avro/RowDeserializer.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils.avro;
+
+import java.io.IOException;
+import org.apache.avro.io.Decoder;
+
+public interface RowDeserializer<T> {

Review Comment:
   is there a reason to use this interface over the standard `DatumReader` interface avro provides?  Same for RowSerializer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] talatuyarer commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
talatuyarer commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1189606385

   R: @TheNeuralBit 
   Could you review this ? This is initial commit. I will continue develop based on your feedbacks.
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] talatuyarer commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
talatuyarer commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1190916883

   > I'd be interested to see the performance of this vs [FastReaderBuilder.createDatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/FastReaderBuilder.html#createDatumReader-org.apache.avro.Schema-org.apache.avro.Schema-) and wrapping the resulting object in a Row.
   
   Thank you for reviewing my pr @steveniemitz Let me write a JMH test and share result. 
   I will reply your other comments after JMH test result sounds good ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1190853794

   > I'd be interested to see the performance of this vs [FastReaderBuilder.createDatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/FastReaderBuilder.html#createDatumReader-org.apache.avro.Schema-org.apache.avro.Schema-) and wrapping the resulting object in a Row.
   
   That documentation is sparse, is there more somewhere?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1190920873

   > That documentation is sparse, is there more somewhere?
   
   I'm not sure actually, I'd stumbled across it a year or two ago because I was doing something very similar to what's being done here.  https://issues.apache.org/jira/browse/AVRO-2247 is the most detail I can find on the work being done to improve the performance.
   
   I realize now after writing that comment though that this wasn't in the java avro library until 1.10, and beam is still on 1.8 (or maybe 1.9?), so it might not be particularly relevant anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Initial Commit for AvroPayloadSerializer [beam]

Posted by "talatuyarer (via GitHub)" <gi...@apache.org>.
talatuyarer commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1815043375

   Hi @damondouglas Let me work on it. I want to merge this MR to BEAM. It makes Beam's AVRO serialization more than 3x faster. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1671255271

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Initial Commit for AvroPayloadSerializer [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1893665102

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Initial Commit for AvroPayloadSerializer [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-2003806700

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1189607165

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] talatuyarer commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
talatuyarer commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1190908114

   > I'd be interested to see the performance of this vs [FastReaderBuilder.createDatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/FastReaderBuilder.html#createDatumReader-org.apache.avro.Schema-org.apache.avro.Schema-) and wrapping the resulting object in a Row.
   
   Let me try to write a JMH test 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1372171105

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Initial Commit for AvroPayloadSerializer [beam]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1755333401

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1191711844

   > > That documentation is sparse, is there more somewhere?
   > 
   > I'm not sure actually, I'd stumbled across it a year or two ago because I was doing something very similar to what's being done here. https://issues.apache.org/jira/browse/AVRO-2247 is the most detail I can find on the work being done to improve the performance.
   > 
   > I realize now after writing that comment though that this wasn't in the java avro library until 1.10, and beam is still on 1.8 (or maybe 1.9?), so it might not be particularly relevant anyways.
   
   Ah ok. If we run some benchmarks now and it looks good we could file an issue to consider switching to it when/if we upgrade Avro. For reference #19969 is the only avro upgrade issue I can find.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1190255231

   I'd be interested to see the performance of this vs [FastReaderBuilder.createDatumReader](https://avro.apache.org/docs/current/api/java/org/apache/avro/io/FastReaderBuilder.html#createDatumReader-org.apache.avro.Schema-org.apache.avro.Schema-) and wrapping the resulting object in a Row.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1458113476

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Initial Commit for AvroPayloadSerializer [beam]

Posted by "damondouglas (via GitHub)" <gi...@apache.org>.
damondouglas commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1814906758

   Good day, @talatuyarer. Thank you again for making this contribution! Would you like to still work on this PR or should I close it? We appreciate what you do for the Beam community!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22348:
URL: https://github.com/apache/beam/pull/22348#discussion_r926118009


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/avro/utils/SerDesUtils.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils.avro.utils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.CodeSource;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SerDesUtils {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SerDesUtils.class);
+
+  private static final Map<Schema, Long> SCHEMA_IDS_CACHE = new HashMap<>();
+  private static final String SEP = "_";
+
+  /**
+   * This function will produce a fingerprint for the provided schema.
+   *
+   * @param schema a schema
+   * @return fingerprint for the given schema
+   */
+  public static Long getSchemaFingerprint(Schema schema) {
+    Long schemaId = SCHEMA_IDS_CACHE.get(schema);
+    if (schemaId == null) {
+      schemaId = SchemaNormalization.parsingFingerprint64(schema);
+      SCHEMA_IDS_CACHE.put(schema, schemaId);
+    }
+
+    return schemaId;
+  }
+
+  public static String getSchemaKey(Schema writerSchema, Schema readerSchema) {

Review Comment:
   nit: is this used?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -439,25 +443,29 @@ public static <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> clazz
 
   /** Returns a function mapping encoded AVRO {@link GenericRecord}s to Beam {@link Row}s. */
   public static SimpleFunction<byte[], Row> getAvroBytesToRowFunction(Schema beamSchema) {
-    return new AvroBytesToRowFn(beamSchema);
+    return new FastAvroBytesToRowFn(beamSchema);
   }
 
-  private static class AvroBytesToRowFn extends SimpleFunction<byte[], Row> {
-    private final AvroCoder<GenericRecord> coder;
-    private final Schema beamSchema;
+  private static class FastAvroBytesToRowFn extends SimpleFunction<byte[], Row> {
+    private final SerDesRegistry registry;
+    private final org.apache.avro.Schema avroSchema;
 
-    AvroBytesToRowFn(Schema beamSchema) {
-      org.apache.avro.Schema avroSchema = toAvroSchema(beamSchema);
-      coder = AvroCoder.of(avroSchema);
-      this.beamSchema = beamSchema;
+    FastAvroBytesToRowFn(Schema beamSchema) {
+      avroSchema = toAvroSchema(beamSchema);
+      //Create Registry in Setup
+      registry = SerDesRegistry.getDefaultInstance();
     }
 
     @Override
     public Row apply(byte[] bytes) {
       try {
-        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
-        GenericRecord record = coder.decode(inputStream);
-        return AvroUtils.toBeamRowStrict(record, beamSchema);
+        //Create Avro decoder
+        Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
+        //Get Deserializer
+        RowDeserializer<Row> deserializer = registry.buildRowDeserializer(avroSchema, avroSchema);

Review Comment:
   Do we need the registry? Couldn't we just do the code generation once when creating the instance of FastAvroBytesToRowFn?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1189622465

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22348?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22348](https://codecov.io/gh/apache/beam/pull/22348?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (bfa5999) into [master](https://codecov.io/gh/apache/beam/commit/d54841c985f7c3a13ce3b09574f76bfcb7eb5e40?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d54841c) will **decrease** coverage by `0.04%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22348      +/-   ##
   ==========================================
   - Coverage   74.24%   74.19%   -0.05%     
   ==========================================
     Files         702      703       +1     
     Lines       92999    93084      +85     
   ==========================================
   + Hits        69045    69062      +17     
   - Misses      22687    22755      +68     
     Partials     1267     1267              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.53% <ø> (-0.09%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22348?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `77.53% <0.00%> (-2.18%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `88.46% <0.00%> (-0.95%)` | :arrow_down: |
   | [...dks/python/apache\_beam/options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vb3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `94.34% <0.00%> (-0.42%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/external.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9leHRlcm5hbC5weQ==) | `79.73% <0.00%> (-0.11%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/kafka.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2Fma2EucHk=) | `80.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/ml/inference/base.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL2Jhc2UucHk=) | `95.37% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/utils/annotations.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvYW5ub3RhdGlvbnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...ache\_beam/coders/proto2\_coder\_test\_messages\_pb2.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vY29kZXJzL3Byb3RvMl9jb2Rlcl90ZXN0X21lc3NhZ2VzX3BiMi5weQ==) | `100.00% <0.00%> (ø)` | |
   | [...es/snippets/transforms/elementwise/runinference.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9ydW5pbmZlcmVuY2UucHk=) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/fileio.py](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZWlvLnB5) | `96.26% <0.00%> (+0.03%)` | :arrow_up: |
   | ... and [5 more](https://codecov.io/gh/apache/beam/pull/22348/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22348:
URL: https://github.com/apache/beam/pull/22348#discussion_r925560370


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/avro/RowDeserializerCodeGenerator.java:
##########
@@ -0,0 +1,1180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.schemas.utils.avro;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.beam.sdk.schemas.utils.avro.utils.SerDesUtils.getClassName;
+import static org.apache.beam.sdk.schemas.utils.avro.utils.SerDesUtils.getSchemaFingerprint;
+
+import com.sun.codemodel.JArray;
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JCatchBlock;
+import com.sun.codemodel.JClass;
+import com.sun.codemodel.JConditional;
+import com.sun.codemodel.JDefinedClass;
+import com.sun.codemodel.JDoLoop;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JFieldVar;
+import com.sun.codemodel.JForLoop;
+import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JMethod;
+import com.sun.codemodel.JMod;
+import com.sun.codemodel.JPackage;
+import com.sun.codemodel.JStatement;
+import com.sun.codemodel.JTryBlock;
+import com.sun.codemodel.JVar;
+import org.apache.beam.sdk.schemas.utils.avro.exceptions.RowSerdesGeneratorException;
+import org.apache.beam.sdk.schemas.utils.avro.utils.SerDesUtils;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
+import org.apache.avro.io.parsing.Symbol;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.jackson.JsonNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RowDeserializerCodeGenerator<T> extends SerDesBase {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RowDeserializerCodeGenerator.class);
+
+  private static final String DECODER = "decoder";
+  private static final String REUSE = "reuse";
+
+  private final Schema writer;
+  private final Schema reader;
+  private JDefinedClass deserializerClass;
+  private JFieldVar schemaMapField;
+  private JMethod generateBeamSchemasMethod;
+  private Map<Long, Schema> schemaMap = new HashMap<>();
+  private Map<Long, JVar> schemaVarMap = new HashMap<>();
+  private Map<String, JMethod> deserializeMethodMap = new HashMap<>();
+  private Map<String, JMethod> skipMethodMap = new HashMap<>();
+  private Map<JMethod, Set<Class<? extends Exception>>> exceptionFromMethodMap = new HashMap<>();
+  private JFieldVar avroSchemaMapField;
+
+
+  /**
+   * Row Deserializer Code generator class. Based on given schemas it generates Java code.
+   *
+   * @param writer           the writer's Avro schema
+   * @param reader           the reader's Avro schema
+   * @param destination      Path for generated java codes.
+   * @param classLoader      classLoader
+   * @param compileClassPath Path for compiled java classes.
+   */
+  public RowDeserializerCodeGenerator(Schema writer, Schema reader, File destination,
+      ClassLoader classLoader, String compileClassPath) {
+    super("deserializer", destination, classLoader, compileClassPath);
+    this.writer = writer;
+    this.reader = reader;
+    LOGGER.warn("RowDeserializerCodeGenerator Constructed.");
+  }
+
+  /**
+   * Row Deserializer Code generator. Based on given constructor's variable it generates Java Code
+   * and load it in classpath then return instance of the generated code.
+   *
+   * @return {@link RowDeserializer}
+   */
+  public RowDeserializer<T> generateDeserializer() {
+    LOGGER.warn("Start generating code of deserializer");
+    String className = getClassName(writer, reader, "RowDeserializer");
+    JPackage classPackage = codeModel._package(this.generatedPackageName);
+    LOGGER.warn("Start generating code of deserializer: " + className);
+
+    try {
+      deserializerClass = classPackage._class(className);
+
+      JVar readerSchemaVar = deserializerClass
+          .field(JMod.PRIVATE | JMod.FINAL, Schema.class, "avroSchema");
+      JMethod constructor = deserializerClass.constructor(JMod.PUBLIC);
+      JVar constructorParam = constructor.param(Schema.class, "avroSchema");
+      constructor.body().assign(JExpr.refthis(readerSchemaVar.name()), constructorParam);
+
+      Schema aliasedWriterSchema = Schema.applyAliases(writer, reader);
+      Symbol resolvingGrammar = new ResolvingGrammarGenerator()
+          .generate(aliasedWriterSchema, reader);

Review Comment:
   I would use [Resolver.resolve](https://avro.apache.org/docs/current/api/java/org/apache/avro/Resolver.html#resolve-org.apache.avro.Schema-org.apache.avro.Schema-) here rather than attempting to consume the grammar directly.  It was made for basically this purpose, building your own decoders, but without having to understand the underlying complexities of the grammar.  See also [refactoring-resolution](https://github.com/apache/avro/blob/master/doc-deprecated/src/content/mddocs/refactoring-resolution.md).  imo using the resolver is significantly easier and less error prone than trying to interpret the grammar directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22348:
URL: https://github.com/apache/beam/pull/22348#discussion_r925552644


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -439,25 +443,29 @@ public static <T> SerializableFunction<Row, T> getFromRowFunction(Class<T> clazz
 
   /** Returns a function mapping encoded AVRO {@link GenericRecord}s to Beam {@link Row}s. */
   public static SimpleFunction<byte[], Row> getAvroBytesToRowFunction(Schema beamSchema) {

Review Comment:
   Maybe it'd make sense here to allow users to pass in a factory for `DatumReader` and `DatumWriter` instances.  Then there could be this implementation, or others if users have their own.  This is similar to how [AvroIO.readFiles](https://beam.apache.org/releases/javadoc/2.39.0/org/apache/beam/sdk/io/AvroIO.ReadFiles.html#withDatumReaderFactory-org.apache.beam.sdk.io.AvroSource.DatumReaderFactory-) works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22348: Initial Commit for AvroPayloadSerializer

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22348:
URL: https://github.com/apache/beam/pull/22348#issuecomment-1582512770

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org