You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lindong28 (via GitHub)" <gi...@apache.org> on 2023/04/16 14:23:19 UTC

[GitHub] [flink-ml] lindong28 commented on a diff in pull request #215: [FLINK-31160] Support join/cogroup in BroadcastUtils.withBroadcastStream

lindong28 commented on code in PR #215:
URL: https://github.com/apache/flink-ml/pull/215#discussion_r1167928810


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -169,83 +144,39 @@
                                         parameters.getOperatorEventDispatcher())
                                 .f0;
 
+        this.indexOfSubtask = containingTask.getIndexInSubtaskGroup();
+        this.numInputs = inTypes.length;
+
         boolean hasRichFunction =
                 wrappedOperator instanceof AbstractUdfStreamOperator
                         && ((AbstractUdfStreamOperator) wrappedOperator).getUserFunction()
                                 instanceof RichFunction;
-
-        if (hasRichFunction) {
-            wrappedOperatorRuntimeContext =
-                    new BroadcastStreamingRuntimeContext(
-                            containingTask.getEnvironment(),
-                            containingTask.getEnvironment().getAccumulatorRegistry().getUserMap(),
-                            wrappedOperator.getMetricGroup(),
-                            wrappedOperator.getOperatorID(),
-                            ((AbstractUdfStreamOperator) wrappedOperator)
-                                    .getProcessingTimeService(),
-                            null,
-                            containingTask.getEnvironment().getExternalResourceInfoProvider());
-
-            ((RichFunction) ((AbstractUdfStreamOperator) wrappedOperator).getUserFunction())
-                    .setRuntimeContext(wrappedOperatorRuntimeContext);
-        } else {
-            throw new RuntimeException(
-                    "The operator is not a instance of "
-                            + AbstractUdfStreamOperator.class.getSimpleName()
-                            + " that contains a "
-                            + RichFunction.class.getSimpleName());
-        }
-
-        this.mailboxExecutor =
-                containingTask.getMailboxExecutorFactory().createExecutor(TaskMailbox.MIN_PRIORITY);
-        // variables specific for withBroadcast functionality.
-        this.broadcastStreamNames = broadcastStreamNames;
-        this.isBlocked = isBlocked;
-        this.inTypes = inTypes;
-        this.broadcastVariablesReady = false;
-        this.indexOfSubtask = containingTask.getIndexInSubtaskGroup();
-        this.numInputs = inTypes.length;
-
-        // puts in mailboxExecutor
-        for (String name : broadcastStreamNames) {
-            BroadcastContext.putMailBoxExecutor(name + "-" + indexOfSubtask, mailboxExecutor);
-        }
-
-        basePath =
-                OperatorUtils.getDataCachePath(
-                        containingTask.getEnvironment().getTaskManagerInfo().getConfiguration(),
-                        containingTask
-                                .getEnvironment()
-                                .getIOManager()
-                                .getSpillingDirectoriesPaths());
-        dataCacheWriters = new DataCacheWriter[numInputs];
-        hasPendingElements = new boolean[numInputs];
-        Arrays.fill(hasPendingElements, true);
+        richContext = new RichContext(hasRichFunction, broadcastStreamNames, inTypes, isBlocked);

Review Comment:
   What is the benefit of moving these variables to `richContext`?
   
   If it is needed to improve readability, can we just add something like `// ----------------rich function context  ------------------` similar to what we do in `AbstractStreamOperator.java` [1]?
   
   The word `RichContext` does not seem intuitive. Maybe provide better comments to explain why these variables belong to the same "group".
   
   And if we do need to move these variables to `RichContext`, it might be better to separate non-trivial refactor code into a dedicated commit.
   
   [1] https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L104



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -269,138 +200,162 @@ private OperatorMetricGroup createOperatorMetricGroup(
     }
 
     /**
-     * extracts common processing logic in subclasses' processing elements.
+     * Extracts common processing logic in subclasses' processing elements.
      *
-     * @param streamRecord the input record.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param streamRecord The input record.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processElementX(
             StreamRecord streamRecord,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
+        if (richContext.hasRichFunction) {
+            if (!richContext.isBlocked[inputIndex]) {
+                if (areBroadcastVariablesReady()) {
+                    if (richContext.hasPendingElements[inputIndex]) {
+                        processPendingElementsAndWatermarks(
+                                inputIndex, elementConsumer, watermarkConsumer, keyContextSetter);
+                        richContext.hasPendingElements[inputIndex] = false;
+                        keyContextSetter.accept(streamRecord);

Review Comment:
   Should we do this right above the following `elementConsumer.accept(streamRecord)`?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -468,7 +425,9 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
 
         timeServiceManager = streamOperatorStateContext.internalTimerServiceManager();
 
-        broadcastVariablesReady = false;
+        if (richContext.hasRichFunction) {
+            richContext.broadcastVariablesReady = false;

Review Comment:
   Would it be more readable to explicitly initialize `broadcastVariablesReady` when `hasRichFunction == false`?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/operator/AbstractBroadcastWrapperOperator.java:
##########
@@ -269,138 +200,162 @@ private OperatorMetricGroup createOperatorMetricGroup(
     }
 
     /**
-     * extracts common processing logic in subclasses' processing elements.
+     * Extracts common processing logic in subclasses' processing elements.
      *
-     * @param streamRecord the input record.
-     * @param inputIndex input id, starts from zero.
-     * @param elementConsumer the consumer function of StreamRecord, i.e.,
+     * @param streamRecord The input record.
+     * @param inputIndex Input id, starts from zero.
+     * @param elementConsumer The consumer function of StreamRecord, i.e.,
      *     operator.processElement(...).
-     * @param watermarkConsumer the consumer function of WaterMark, i.e.,
+     * @param watermarkConsumer The consumer function of WaterMark, i.e.,
      *     operator.processWatermark(...).
-     * @throws Exception possible exception.
+     * @param keyContextSetter The consumer function of setting key context, i.e.,
+     *     operator.setKeyContext(...).
+     * @throws Exception Possible exception.
      */
     @SuppressWarnings({"rawtypes", "unchecked"})
     protected void processElementX(
             StreamRecord streamRecord,
             int inputIndex,
             ThrowingConsumer<StreamRecord, Exception> elementConsumer,
-            ThrowingConsumer<Watermark, Exception> watermarkConsumer)
+            ThrowingConsumer<Watermark, Exception> watermarkConsumer,
+            ThrowingConsumer<StreamRecord, Exception> keyContextSetter)
             throws Exception {
-        if (!isBlocked[inputIndex]) {
-            if (areBroadcastVariablesReady()) {
-                if (hasPendingElements[inputIndex]) {
-                    processPendingElementsAndWatermarks(
-                            inputIndex, elementConsumer, watermarkConsumer);
-                    hasPendingElements[inputIndex] = false;
+        if (richContext.hasRichFunction) {

Review Comment:
   It is not very readable to have 4 stacked `if` statements.
   
   How about using something like this:
   
   ```
   if (!richContext.hasRichFunction) {
       elementConsumer.accept(streamRecord);
   } else if (richContext.isBlocked[inputIndex]) {
       while (!areBroadcastVariablesReady()) {
           richContext.mailboxExecutor.yield();
       }
       elementConsumer.accept(streamRecord);
   } else if (!areBroadcastVariablesReady()) {
       richContext.dataCacheWriters[inputIndex].addRecord(
               CacheElement.newRecord(streamRecord.getValue()));
   } else {
       if (richContext.hasPendingElements[inputIndex]) {
           processPendingElementsAndWatermarks(
                   inputIndex, elementConsumer, watermarkConsumer, keyContextSetter);
           richContext.hasPendingElements[inputIndex] = false;
           keyContextSetter.accept(streamRecord);
       }
       elementConsumer.accept(streamRecord);
   }
   ```



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java:
##########
@@ -152,41 +155,52 @@ private static <OUT> DataStream<OUT> cacheBroadcastVariables(
     }
 
     /**
-     * uses {@link DraftExecutionEnvironment} to execute the userDefinedFunction and returns the
+     * Uses {@link DraftExecutionEnvironment} to execute the userDefinedFunction and returns the
      * resultStream.
      *
-     * @param env execution environment.
-     * @param inputList non-broadcast input list.
-     * @param broadcastStreamNames names of the broadcast data streams.
-     * @param graphBuilder user-defined logic.
-     * @param <OUT> output type of the result stream.
-     * @return the result stream by applying user-defined logic on the input list.
+     * @param env Execution environment.
+     * @param inputList Non-broadcast input list.
+     * @param broadcastStreamNames Names of the broadcast data streams.
+     * @param graphBuilder User-defined logic.
+     * @param <OUT> Output type of the result stream.
+     * @return The result stream by applying user-defined logic on the input list.
      */
     private static <OUT> DataStream<OUT> getResultStream(
             StreamExecutionEnvironment env,
             List<DataStream<?>> inputList,
             String[] broadcastStreamNames,
             Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
-        TypeInformation<?>[] inTypes = new TypeInformation[inputList.size()];
-        for (int i = 0; i < inputList.size(); i++) {
-            inTypes[i] = inputList.get(i).getType();
-        }
-        // do not block all non-broadcast input edges by default.
-        boolean[] isBlocked = new boolean[inputList.size()];
-        Arrays.fill(isBlocked, false);
+
+        // Executes the graph builder and gets real non-broadcast inputs.
         DraftExecutionEnvironment draftEnv =
-                new DraftExecutionEnvironment(
-                        env, new BroadcastWrapper<>(broadcastStreamNames, inTypes, isBlocked));
+                new DraftExecutionEnvironment(env, new DefaultWrapper<>());
 
         List<DataStream<?>> draftSources = new ArrayList<>();
         for (DataStream<?> dataStream : inputList) {
             draftSources.add(draftEnv.addDraftSource(dataStream, dataStream.getType()));
         }
         DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources);
-        Preconditions.checkState(
-                draftEnv.getStreamGraph(false).getStreamNodes().size() == 1 + inputList.size(),
-                "cannot add more than one operator in withBroadcastStream's lambda function.");
-        draftEnv.copyToActualEnvironment();
-        return draftEnv.getActualStream(draftOutStream.getId());
+
+        List<Transformation<?>> realNonBroadcastInputs =
+                draftOutStream.getTransformation().getInputs();

Review Comment:
   Can you explain why the inputs of the transformation of datastream generated by the `graphBuilder` is equivalent to the non-broadcast element in the original `inputList`?
   
   Note that `graphBuilder` can include multiple join/map operators such that the inputs of the datastream computed by it might not even be a subset of the original `inputList`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org