You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/12 00:50:33 UTC

[GitHub] [beam] reuvenlax commented on a change in pull request #15465: [BEAM-12795] When using schemas, allow state/timers use without KV objects.

reuvenlax commented on a change in pull request #15465:
URL: https://github.com/apache/beam/pull/15465#discussion_r726681098



##########
File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EnvironmentsTest.java
##########
@@ -249,7 +238,7 @@ public void process(ProcessContext ctxt) {}
     Environment env1 = Environments.getEnvironment(ptransform, rehydratedComponents).get();
     assertThat(
         env1,
-        equalTo(components.toComponents().getEnvironmentsOrThrow(ptransform.getEnvironmentId())));
+        equalTo(components.toComponents().getEnvironmentsOrThrow(ptransform.getEnvironmentId())));*/

Review comment:
       No - fixed

##########
File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
##########
@@ -100,7 +101,8 @@
               windowingStrategy,
               doFnSchemaInformation,
               sideInputMapping);
-      if (DoFnSignatures.signatureForDoFn(fn).usesState()) {
+      DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
+      if (signature.usesState() || signature.onWindowExpiration() != null) {

Review comment:
       @lukecwik would you prefer this be in a separate PR? it arguably fixes a separable bug.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -142,69 +144,111 @@ private MultiOutputOverrideFactory() {}
     }
   }
 
-  static class StatefulSingleOutputParDo<K, InputT, OutputT>
-      extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> {
+  static class StatefulSingleOutputParDo<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
-    private final ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo;
+    private final ParDo.SingleOutput<InputT, OutputT> originalParDo;
 
-    StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo) {
+    StatefulSingleOutputParDo(ParDo.SingleOutput<InputT, OutputT> originalParDo) {
       this.originalParDo = originalParDo;
     }
 
-    ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() {
+    ParDo.SingleOutput<InputT, OutputT> getOriginalParDo() {
       return originalParDo;
     }
 
     @Override
-    public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) {
-      DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+    @SuppressWarnings({"rawtypes"})
+    public PCollection<OutputT> expand(PCollection<InputT> input) {
+      DoFn fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
       DataflowPipelineOptions options =
           input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
       DataflowRunner.verifyDoFnSupported(fn, false, DataflowRunner.useStreamingEngine(options));
       DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
-      PTransform<
-              PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
-              PCollection<OutputT>>
+      PCollection keyedInput = input;
+      // ParDo does this in ParDo.MultiOutput.expand. However since we're replacing
+      // ParDo.SingleOutput, the results
+      // of the initial expansion of ParDo.MultiOutput are thrown away, so we need to add the key
+      // back in.
+      DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+      @Nullable FieldAccessDescriptor keyFieldAccess = originalParDo.getKeyFieldsDescriptor();
+      if (keyFieldAccess != null) {
+        if (!input.hasSchema()) {
+          throw new IllegalArgumentException(
+              "Cannot specify a @StateKeyFields if not using a schema");
+        }
+        keyedInput = input.apply("Extract schema keys", ParDo.getWithSchemaKeys(keyFieldAccess));
+      }
+      return continueExpand(keyedInput, fn);
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    public <K, V> PCollection<OutputT> continueExpand(PCollection<KV<K, V>> input, DoFn fn) {
+      ParDo.SingleOutput<KV<K, Iterable<KV<Instant, WindowedValue<KV<K, V>>>>>, OutputT>
           statefulParDo =
-              ParDo.of(new BatchStatefulDoFn<>(fn)).withSideInputs(originalParDo.getSideInputs());
+              ParDo.of(new BatchStatefulDoFn<K, V, OutputT>(fn))
+                  .withSideInputs(originalParDo.getSideInputs());
 
       return input.apply(new GbkBeforeStatefulParDo<>()).apply(statefulParDo);
     }
   }
 
-  static class StatefulMultiOutputParDo<K, InputT, OutputT>
-      extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> {
+  static class StatefulMultiOutputParDo<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionTuple> {
 
-    private final ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo;
+    private final ParDo.MultiOutput<InputT, OutputT> originalParDo;
 
-    StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo) {
+    StatefulMultiOutputParDo(ParDo.MultiOutput<InputT, OutputT> originalParDo) {
       this.originalParDo = originalParDo;
     }
 
     @Override
-    public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
-      DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
+    @SuppressWarnings({"rawtypes"})
+    public PCollectionTuple expand(PCollection<InputT> input) {
+      DoFn fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
       DataflowPipelineOptions options =
           input.getPipeline().getOptions().as(DataflowPipelineOptions.class);
       DataflowRunner.verifyDoFnSupported(fn, false, DataflowRunner.useStreamingEngine(options));
       DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
-      PTransform<
-              PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>,
-              PCollectionTuple>
+      PCollection keyedInput = input;
+      // ParDo does this in ParDo.MultiOutput.expand. However since we're replacing
+      // ParDo.SingleOutput, the results
+      // of the initial expansion of ParDo.MultiOutput are thrown away, so we need to add the key
+      // back Parin.

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
##########
@@ -224,10 +268,11 @@ public PCollectionTuple expand(PCollection<KV<K, InputT>> input) {
       // is not registered by default, so we explicitly set the relevant coders.
       checkState(
           input.getCoder() instanceof KvCoder,
-          "Input to a %s using state requires a %s, but the coder was %s",
+          "Input to a %s using state requires a %s, but the coder was %s. PColleciton %s",

Review comment:
       done

##########
File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
##########
@@ -329,7 +329,12 @@ public void processElement(Object untypedElem) throws Exception {
 
     WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
 
-    if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
+    // We use the state-cleanup timer to implementt onWindowExpiration, so make sure to set it if

Review comment:
       done

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -396,6 +396,12 @@ public Duration getAllowedTimestampSkew() {
     String value();
   }
 
+  @Documented

Review comment:
       done

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -2449,7 +2467,13 @@ public void output(OutputT output) {
 
     @Override
     public InputT element() {
-      return currentElement.getValue();
+      if (doFnSchemaInformation.getKeyFieldsDescriptor() != null) {

Review comment:
       Yeah, that makes sense and similar to what we do with WindowObservingProcessBundleContext. Do you think it also makes sense to memoize things? (e.g. the schema key accessor currently parses the key out of the record every time it is called)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -1015,12 +1303,129 @@ public TupleTagList getAdditionalOutputTags() {
       return PCollectionViews.toAdditionalInputs(sideInputs.values());
     }
 
+    public FieldAccessDescriptor getKeyFieldsDescriptor() {
+      return keyFieldsDescriptor;
+    }
+
+    @Internal
+    public DoFnSchemaInformation getDoFnSchemaInformation() {
+      return doFnSchemaInformation;
+    }
+
     @Override
     public String toString() {
       return fn.toString();
     }
   }
 
+  public static <T> PTransform<PCollection<T>, PCollection<KV<Row, T>>> getWithSchemaKeys(
+      FieldAccessDescriptor fieldAccessDescriptor) {
+    return new SchemaToKv<>(fieldAccessDescriptor);
+  }
+
+  private static class SchemaToKv<T> extends PTransform<PCollection<T>, PCollection<KV<Row, T>>> {
+    private RowSelector rowSelector;
+    private final FieldAccessDescriptor fieldAccessDescriptor;
+
+    SchemaToKv(FieldAccessDescriptor fieldAccessDescriptor) {
+      this.fieldAccessDescriptor = fieldAccessDescriptor;
+    }
+
+    @Override
+    public PCollection<KV<Row, T>> expand(PCollection<T> input) {
+      Schema schema = input.getSchema();
+      TypeDescriptor<T> typeDescriptor = input.getTypeDescriptor();
+      SerializableFunction<T, Row> toRowFunction = input.getToRowFunction();
+      SerializableFunction<Row, T> fromRowFunction = input.getFromRowFunction();
+
+      FieldAccessDescriptor resolved = fieldAccessDescriptor.resolve(schema);
+      rowSelector = new RowSelectorContainer(schema, resolved, true);
+      Schema keySchema = SelectHelpers.getOutputSchema(schema, resolved);
+
+      return input
+          .apply(
+              "selectKeys",
+              ParDo.of(
+                  new DoFn<T, KV<Row, T>>() {
+                    @ProcessElement
+                    public void process(
+                        @Element Row row, // Beam will convert the element to a row.
+                        @Element T element, // Beam will return the original element.
+                        OutputReceiver<KV<Row, T>> o) {
+                      o.output(KV.of(rowSelector.select(row), element));
+                    }
+                  }))
+          .setCoder(
+              KvCoder.of(
+                  SchemaCoder.of(keySchema),
+                  SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction)));
+    }
+  }
+
+  public static class MultiOutputSchemaKeyFields<InputT, DoFnInputT, OutputT>

Review comment:
       this would help somewhat, but not completely. we would still need to register separate transform translators, as those are based off of the class itself, not instanceof.
   
   This would simply translation logic a bit, but it comes with a typing challenge. MultiOutput today assumes that the DoFn input type is the same as the PCollection input type. In MultiOutputSchemaKeyFields this is not the case - the PCollection input is a KV while the DoFn input is the original type. Currently this blocks me making one.a subclass of the other unless I do some strange finagling of types

##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoSchemaTest.java
##########
@@ -681,30 +696,349 @@ public void processElement(
           }
         };
 
+    TupleTag<Row> mainTag = new TupleTag<>();
     PCollection<Row> output =
         pipeline
             .apply(
+                "Create values",
                 Create.of(
-                    KV.of("hello", Row.withSchema(type).addValue("a").build()),
-                    KV.of("hello", Row.withSchema(type).addValue("b").build()),
-                    KV.of("hello", Row.withSchema(type).addValue("c").build()),
-                    KV.of("hello", Row.withSchema(type).addValue("d").build())))
-            .apply(ParDo.of(fn))
+                        Row.withSchema(type).addValues("a", "hello").build(),
+                        Row.withSchema(type).addValues("b", "hello").build(),
+                        Row.withSchema(type).addValues("c", "hello").build(),
+                        Row.withSchema(type).addValues("d", "hello").build())
+                    .withRowSchema(type))
+            .apply("run statetful fn", ParDo.of(fn).withOutputTags(mainTag, TupleTagList.empty()))

Review comment:
       done

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
##########
@@ -431,22 +436,37 @@ private static void finishSpecifyingStateSpecs(
     }
   }
 
-  private static void validateStateApplicableForInput(DoFn<?, ?> fn, PCollection<?> input) {
+  private static void validateStateApplicableForInput(
+      DoFn<?, ?> fn, PCollection<?> input, @Nullable FieldAccessDescriptor fieldAccessDescriptor) {
     Coder<?> inputCoder = input.getCoder();
-    checkArgument(
-        inputCoder instanceof KvCoder,
-        "%s requires its input to use %s in order to use state and timers.",
-        ParDo.class.getSimpleName(),
-        KvCoder.class.getSimpleName());
+    if (fieldAccessDescriptor == null) {
+      checkArgument(
+          inputCoder instanceof KvCoder,
+          "%s requires its input to either use %s or have a schema input in order to use state and timers.",

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org