You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/04/06 23:49:00 UTC

[jira] [Work logged] (BEAM-12114) Eliminate beam_fn_api from KafkaIO expansion

     [ https://issues.apache.org/jira/browse/BEAM-12114?focusedWorklogId=578013&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-578013 ]

ASF GitHub Bot logged work on BEAM-12114:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Apr/21 23:48
            Start Date: 06/Apr/21 23:48
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on a change in pull request #14419:
URL: https://github.com/apache/beam/pull/14419#discussion_r608248229



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
##########
@@ -224,6 +224,7 @@ public void trySplit(
       ProcessBundleSplitRequest request, ProcessBundleSplitResponse.Builder response) {
     DesiredSplit desiredSplit = request.getDesiredSplitsMap().get(pTransformId);
     if (desiredSplit == null) {
+      LOG.info("[BOYUANZ LOG] {}", request.getInstructionId());

Review comment:
       .

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1209,67 +1215,137 @@ public void setTimestampPolicy(String timestampPolicy) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      // The Read will be expanded into SDF transform when "beam_fn_api" is enabled.
-      if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          || ExperimentalOptions.hasExperiment(
+      // For read from unbounded in a bounded manner, we actually are not going through Read or SDF.
+      if (ExperimentalOptions.hasExperiment(
               input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
           || getMaxNumRecords() < Long.MAX_VALUE
           || getMaxReadTime() != null) {
+        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
+      }
+      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
+    }
+
+    public static final PTransformOverride KAFKA_READ_OVERRIDE =

Review comment:
       Mark `@Internal` just to be clear that it is not for pipeline authors. Would be good to document why this exists and when to use it.
   
   Perhaps it could be in runners-core-construction, but I actually want to merge that back into the core SDK so no need.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 578013)
    Remaining Estimate: 0h
            Time Spent: 10m

> Eliminate beam_fn_api from KafkaIO expansion
> --------------------------------------------
>
>                 Key: BEAM-12114
>                 URL: https://issues.apache.org/jira/browse/BEAM-12114
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: P2
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> We are going to use Splittable DoFn expansion by default for KafkaIO without looking into beam_fn_api flag. But KafkaIO provides overrides for runners that decides to not use such expansion. Pipieline author can also use beam_fn_api_use_deprecated_read to swich to the old expansion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)