You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/07 03:11:10 UTC

[GitHub] [flink-ml] gaoyunhaii opened a new pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

gaoyunhaii opened a new pull request #17:
URL: https://github.com/apache/flink-ml/pull/17


   This PR implements the checkpoint mechanism for the iteration. The target of the checkpoint mechanism is to ensure
   1. The record processing is consistent with the state, which is the same to the normal checkpoints without feedback edges.
   2. The notification of epoch incremented is exactly-once. 
   
   The checkpoints relies on the reference count mechanism to include the feedback records into snapshots. Besides, it also take cares of the state for the controller operator / all-round wrappers and per-round wrappers. At this version it introduce some limitation in that it does not allows for the all the operators inside the iteration to change parallelism after restart from checkpoint. For the long run, the condition could be relaxed to
   1. Unbounded iteration could rescale freely.
   2. Bounded iteration could rescale all-round / per-round operators if they are restored from a savepoint / external checkpoint taken after round 0 is fully finished. However, the controller operator (Head & Tail) could not support rescaling.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741846984



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
##########
@@ -53,13 +53,14 @@ public void processElement(StreamRecord<T> streamRecord) throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        if (insertMaxEpochWatermark) {
-            reusable.replace(
-                    IterationRecord.newEpochWatermark(
-                            Integer.MAX_VALUE,
-                            OperatorUtils.getUniqueSenderId(
-                                    getOperatorID(), getRuntimeContext().getIndexOfThisSubtask())));
-            output.collect(reusable);
-        }
+        //        if (insertMaxEpochWatermark) {

Review comment:
       I'll remove this code~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r740035659



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
##########
@@ -53,13 +53,14 @@ public void processElement(StreamRecord<T> streamRecord) throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        if (insertMaxEpochWatermark) {
-            reusable.replace(
-                    IterationRecord.newEpochWatermark(
-                            Integer.MAX_VALUE,
-                            OperatorUtils.getUniqueSenderId(
-                                    getOperatorID(), getRuntimeContext().getIndexOfThisSubtask())));
-            output.collect(reusable);
-        }
+        //        if (insertMaxEpochWatermark) {

Review comment:
       I think it might be better to remove the no use code. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#issuecomment-959139706


   Very thank @yunfengzhou-hub and @guoweiM  for the review! will merge~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739975028



##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/FailingMap.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+
+/** Map Function triggers failover at the first runn. */

Review comment:
       runn -> round?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739975373



##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/FailingMap.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+
+/** Map Function triggers failover at the first runn. */
+public class FailingMap extends RichMapFunction<Integer, Integer> {
+
+    private final int failingCount;
+
+    private int count;
+
+    public FailingMap(int failingCount) {
+        this.failingCount = failingCount;
+    }
+
+    @Override
+    public Integer map(Integer value) throws Exception {
+        count++;
+        if (getRuntimeContext().getIndexOfThisSubtask() == 0
+                && getRuntimeContext().getAttemptNumber() == 0
+                && count >= failingCount) {
+            throw new RuntimeException("badbad");

Review comment:
       I think this "badbad" could be more meaningful. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r725956331



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/proxy/state/ProxyOperatorStateBackend.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration.proxy.state;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.RunnableFuture;
+
+/** Proxy {@link OperatorStateBackend} for the wrapped Operator. */
+public class ProxyOperatorStateBackend implements OperatorStateBackend {
+
+    private final OperatorStateBackend wrappedBackend;
+
+    private final String stateNamePrefix;
+
+    public ProxyOperatorStateBackend(OperatorStateBackend wrappedBackend, String stateNamePrefix) {
+        this.wrappedBackend = wrappedBackend;
+        this.stateNamePrefix = stateNamePrefix;
+    }
+
+    @Override
+    public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
+            throws Exception {
+        MapStateDescriptor<K, V> newDescriptor =
+                new MapStateDescriptor<>(
+                        stateNamePrefix + stateDescriptor.getName(),
+                        stateDescriptor.getKeySerializer(),
+                        stateDescriptor.getValueSerializer());
+        return wrappedBackend.getBroadcastState(newDescriptor);
+    }
+
+    @Override
+    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
+        ListStateDescriptor<S> newDescriptor =
+                new ListStateDescriptor<>(
+                        stateNamePrefix + stateDescriptor.getName(),
+                        stateDescriptor.getElementSerializer());
+        return wrappedBackend.getListState(newDescriptor);
+    }
+
+    @Override
+    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
+            throws Exception {
+        ListStateDescriptor<S> newDescriptor =
+                new ListStateDescriptor<S>(
+                        stateNamePrefix + stateDescriptor.getName(),
+                        stateDescriptor.getElementSerializer());
+        return wrappedBackend.getUnionListState(stateDescriptor);

Review comment:
       guess it should be `return wrappedBackend.getUnionListState(newDescriptor);`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r726816428



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/datacache/nonkeyed/DataCacheSnapshot.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration.datacache.nonkeyed;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The snapshot of a data cache. It could be written out or read from an external stream.O */
+public class DataCacheSnapshot {
+
+    private static final int CURRENT_VERSION = 1;
+
+    private final FileSystem fileSystem;
+
+    @Nullable private final Tuple2<Integer, Integer> readerPosition;
+
+    private final List<Segment> segments;
+
+    public DataCacheSnapshot(
+            FileSystem fileSystem,
+            @Nullable Tuple2<Integer, Integer> readerPosition,
+            List<Segment> segments) {
+        this.fileSystem = fileSystem;
+        this.readerPosition = readerPosition;
+        this.segments = segments;
+    }
+
+    public FileSystem getFileSystem() {
+        return fileSystem;
+    }
+
+    @Nullable
+    public Tuple2<Integer, Integer> getReaderPosition() {
+        return readerPosition;
+    }
+
+    public List<Segment> getSegments() {
+        return segments;
+    }
+
+    public void writeTo(OutputStream checkpointOutputStream) throws IOException {
+        try (DataOutputStream dos =
+                new DataOutputStream(new NonClosingOutpusStreamDecorator(checkpointOutputStream))) {
+            dos.writeInt(CURRENT_VERSION);
+            dos.writeBoolean(readerPosition != null);
+            if (readerPosition != null) {
+                dos.writeInt(readerPosition.f0);
+                dos.writeInt(readerPosition.f1);
+            }
+
+            dos.writeBoolean(fileSystem.isDistributedFS());
+            if (fileSystem.isDistributedFS()) {
+                // We only need to record the segments itself
+                dos.writeInt(segments.size());
+                for (Segment segment : segments) {
+                    dos.writeUTF(segment.getPath().toString());
+                    dos.writeInt(segment.getCount());
+                }
+            } else {
+                // We have to copy the whole streams.
+                int totalRecords = segments.stream().mapToInt(Segment::getCount).sum();
+                checkState(totalRecords >= 0, "overflowed: " + totalRecords);
+                dos.writeInt(totalRecords);
+
+                for (Segment segment : segments) {
+                    try (FSDataInputStream inputStream = fileSystem.open(segment.getPath())) {
+                        IOUtils.copyBytes(inputStream, checkpointOutputStream, false);
+                    }
+                }
+            }
+        }
+    }
+
+    public static <T> void replay(
+            InputStream checkpointInputStream,
+            TypeSerializer<T> serializer,
+            FileSystem fileSystem,
+            FeedbackConsumer<T> feedbackConsumer)
+            throws Exception {
+        try (DataInputStream dis =
+                new DataInputStream(new NonClosingInputStreamDecorator(checkpointInputStream))) {
+            int version = dis.readInt();
+            checkState(
+                    version == CURRENT_VERSION,
+                    "Currently only support version " + CURRENT_VERSION);
+            parseReaderPosition(dis);
+
+            boolean isDistributedFS = dis.readBoolean();
+            if (isDistributedFS) {
+                List<Segment> segments = parseSegments(dis);
+                DataCacheReader<T> dataCacheReader =
+                        new DataCacheReader<T>(serializer, fileSystem, segments);
+                while (dataCacheReader.hasNext()) {
+                    feedbackConsumer.processFeedback(dataCacheReader.next());
+                }
+            } else {
+                DataInputViewStreamWrapper dataInputView = new DataInputViewStreamWrapper(dis);
+                int totalRecords = dis.readInt();
+                for (int i = 0; i < totalRecords; ++i) {
+                    feedbackConsumer.processFeedback(serializer.deserialize(dataInputView));
+                }
+            }
+        }
+    }
+
+    public static DataCacheSnapshot recover(
+            InputStream checkpointInputStream,
+            FileSystem fileSystem,
+            SupplierWithException<Path, IOException> pathGenerator)
+            throws IOException {
+        try (DataInputStream dis =
+                new DataInputStream(new NonClosingInputStreamDecorator(checkpointInputStream))) {
+            int version = dis.readInt();
+            checkState(
+                    version == CURRENT_VERSION,
+                    "Currently only support version " + CURRENT_VERSION);
+            Tuple2<Integer, Integer> readerPosition = parseReaderPosition(dis);
+
+            boolean isDistributedFS = dis.readBoolean();
+            checkState(
+                    isDistributedFS == fileSystem.isDistributedFS(),
+                    "Currently we do not support changing the cache file system. "
+                            + "If required, please manually copy the directory from one filesystem to another.");
+
+            List<Segment> segments;
+            if (isDistributedFS) {
+                segments = parseSegments(dis);
+            } else {
+                int totalRecords = dis.readInt();
+                Path path = pathGenerator.get();
+                try (FSDataOutputStream outputStream =
+                        fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE)) {
+                    IOUtils.copyBytes(checkpointInputStream, outputStream, false);
+                }
+                segments = Collections.singletonList(new Segment(path, totalRecords));
+            }
+
+            return new DataCacheSnapshot(fileSystem, readerPosition, segments);
+        }
+    }

Review comment:
       `replay()` is used only in `HeadOperator.initializeState()` and `recover` is not used except in tests. Can we just delete `recover()` and rename `replay()` to `recover()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r740036597



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/allround/AbstractAllRoundWrapperOperator.java
##########
@@ -99,19 +99,18 @@ public AbstractAllRoundWrapperOperator(
 
     @Override
     public void onEpochWatermarkIncrement(int epochWatermark) throws IOException {
-        if (epochWatermark <= latestEpochWatermark) {
-            return;
+        if (epochWatermark > latestEpochWatermark) {
+            latestEpochWatermark = epochWatermark;
+
+            setIterationContextRound(epochWatermark);
+            processOperatorOrUdfIfSatisfy(
+                    wrappedOperator,
+                    IterationListener.class,
+                    listener -> notifyEpochWatermarkIncrement(listener, epochWatermark));
+            clearIterationContextRound();
         }
-        latestEpochWatermark = epochWatermark;
 
-        setIterationContextRound(epochWatermark);
-        processOperatorOrUdfIfSatisfy(
-                wrappedOperator,
-                IterationListener.class,
-                listener -> notifyEpochWatermarkIncrement(listener, epochWatermark));
-        clearIterationContextRound();
-
-        // Broadcast the events.
+        // Always broadcast the events.

Review comment:
       broadcasts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741845456



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {

Review comment:
       I'll use registerAlignedListener




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#issuecomment-959139706






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739975616



##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
##########
@@ -49,16 +59,53 @@
 
     private transient OutputTag<OutputRecord<Integer>> outputTag;
 
+    private transient ListState<Map<Integer, Integer>> sumByRoundsState;
+
+    private transient ListState<Integer> cachedRecordsState;
+
     public ReduceAllRoundProcessFunction(boolean sync, int maxRound) {
         this.sync = sync;
         this.maxRound = maxRound;
     }
 
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void initializeState(FunctionInitializationContext functionInitializationContext)
+            throws Exception {
         this.sumByEpochs = new HashMap<>();
         cachedRecords = new ArrayList<>();

Review comment:
       this.cacheRecords?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739945912



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/HeadOperatorCoordinator.java
##########
@@ -61,6 +62,12 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
         this.subtaskGateways[i] = subtaskGateway;
     }
 
+    @Override
+    public void resetToCheckpoint(long l, @Nullable byte[] bytes) {}

Review comment:
       l -> checkpointId

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
##########
@@ -155,12 +176,19 @@ public void processFeedback(StreamRecord<IterationRecord<?>> iterationRecord) th
     @Override
     public void handleOperatorEvent(OperatorEvent operatorEvent) {
         if (operatorEvent instanceof GloballyAlignedEvent) {
-            boolean shouldTerminate =
-                    recordProcessor.onGloballyAligned((GloballyAlignedEvent) operatorEvent);
-            if (shouldTerminate) {
-                status = HeadOperatorStatus.TERMINATING;
-                recordProcessor = new TerminatingHeadOperatorRecordProcessor();
-            }
+            checkpointAligner
+                    .checkHoldingGloballyAlignedEvent((GloballyAlignedEvent) operatorEvent)
+                    .ifPresent(this::processGloballyAlignedEvent);
+        } else if (operatorEvent instanceof CoordinatorCheckpointEvent) {
+            checkpointAligner.coordinatorNotify((CoordinatorCheckpointEvent) operatorEvent);
+        }

Review comment:
       Maybe we could throw `Exception` if the `operatorEvent` is unexpected.

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/HeadOperatorCoordinator.java
##########
@@ -61,6 +62,12 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
         this.subtaskGateways[i] = subtaskGateway;
     }
 
+    @Override
+    public void resetToCheckpoint(long l, @Nullable byte[] bytes) {}
+
+    @Override
+    public void subtaskFailed(int i, @Nullable Throwable throwable) {}

Review comment:
       i -> subtaskId or subtaskIndex?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii closed pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #17:
URL: https://github.com/apache/flink-ml/pull/17


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r726828521



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/checkpoint/CheckpointsBroker.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration.checkpoint;
+
+import org.apache.flink.statefun.flink.core.feedback.SubtaskFeedbackKey;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Hand offs the {@link Checkpoints} from the head operator to the tail operator so that the tail
+ * operator could decrease the reference count of the raw state when checkpoints are aborted. We
+ * could not count on the head operator since it would be blocked on closing the raw state when
+ * aborting the checkpoint. It also looks like a bug.
+ */
+public class CheckpointsBroker {
+
+    private static final CheckpointsBroker INSTANCE = new CheckpointsBroker();
+
+    private final ConcurrentHashMap<SubtaskFeedbackKey<?>, Checkpoints<?>> checkpointManagers =
+            new ConcurrentHashMap<>();
+
+    public static CheckpointsBroker get() {
+        return INSTANCE;
+    }
+
+    public <V> void setCheckpoints(SubtaskFeedbackKey<V> key, Checkpoints<V> checkpoints) {
+        checkpointManagers.put(key, checkpoints);
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public <V> Checkpoints<V> getCheckpoints(SubtaskFeedbackKey<V> key) {
+        Objects.requireNonNull(key);
+        return (Checkpoints<V>) Objects.requireNonNull(checkpointManagers.get(key));
+    }
+
+    @SuppressWarnings("resource")
+    void removeChannel(SubtaskFeedbackKey<?> key) {

Review comment:
       `removeCheckpoints()` might be a better name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r726848319



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/coordinator/HeadOperatorCoordinator.java
##########
@@ -63,10 +63,16 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
     }
 
     @Override
-    public void resetToCheckpoint(long l, @Nullable byte[] bytes) {}
+    public void resetToCheckpoint(long l, @Nullable byte[] bytes) {
+        for (int i = 0; i < context.currentParallelism(); ++i) {

Review comment:
       Guess we should either remove the `for` loop or add `i` to the parameter list of `removeProgressInfo`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r731782006



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/coordinator/HeadOperatorCoordinator.java
##########
@@ -63,10 +63,16 @@ public void subtaskReady(int i, SubtaskGateway subtaskGateway) {
     }
 
     @Override
-    public void resetToCheckpoint(long l, @Nullable byte[] bytes) {}
+    public void resetToCheckpoint(long l, @Nullable byte[] bytes) {
+        for (int i = 0; i < context.currentParallelism(); ++i) {

Review comment:
       We should indeed remove the for loop...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739942123



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperatorCheckpointAligner.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.iteration.operator.event.CoordinatorCheckpointEvent;
+import org.apache.flink.iteration.operator.event.GloballyAlignedEvent;
+import org.apache.flink.util.function.RunnableWithException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Aligns the checkpoint barrier from the task inputs and the checkpoint event from the coordinator.
+ * Besides, it needs to hold the other operator events after the checkpoint event till the state is
+ * snapshot.
+ */
+class HeadOperatorCheckpointAligner {

Review comment:
       I __strongly suggest__ that we should give more detailed document about why we do checkpoint like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#issuecomment-959139706


   Very thank @yunfengzhou-hub and @guoweiM  for the review! will merge~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741842056



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -224,4 +261,28 @@ public boolean isTerminated() {
             return totalRecord == 0 || (hasCriteriaStream && totalCriteriaRecord == 0);
         }
     }
+
+    private static class CheckpointStatus {
+
+        private final long totalHeadParallelism;
+
+        private final List<CompletableFuture<byte[]>> stateFutures = new ArrayList<>();
+
+        private int notifiedCoordinatorParallelism;

Review comment:
       I think `int` should be enough for representing the parallelism ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741842056



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -224,4 +261,28 @@ public boolean isTerminated() {
             return totalRecord == 0 || (hasCriteriaStream && totalCriteriaRecord == 0);
         }
     }
+
+    private static class CheckpointStatus {
+
+        private final long totalHeadParallelism;
+
+        private final List<CompletableFuture<byte[]>> stateFutures = new ArrayList<>();
+
+        private int notifiedCoordinatorParallelism;

Review comment:
       I think `int` should be enough for representing the parallelism ? 

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {

Review comment:
       I'll use registerAlignedListener

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {
         runInEventLoop(
-                () -> this.alignedConsumers.put(operatorID, alignedConsumer),
-                "Register consumer %s",
+                () -> this.listeners.put(operatorID, alignedConsumer),
+                "Register listeners %s",
                 operatorID.toHexString());
     }
 
     public void unregisterConsumer(OperatorID operatorID) {
         synchronized (this) {

Review comment:
       The synchronized should not required in both places since now we put all the operations into the same thread. I'll remove the synchronized here.

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
##########
@@ -53,13 +53,14 @@ public void processElement(StreamRecord<T> streamRecord) throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        if (insertMaxEpochWatermark) {
-            reusable.replace(
-                    IterationRecord.newEpochWatermark(
-                            Integer.MAX_VALUE,
-                            OperatorUtils.getUniqueSenderId(
-                                    getOperatorID(), getRuntimeContext().getIndexOfThisSubtask())));
-            output.collect(reusable);
-        }
+        //        if (insertMaxEpochWatermark) {

Review comment:
       I'll remove this code~

##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
##########
@@ -49,16 +59,53 @@
 
     private transient OutputTag<OutputRecord<Integer>> outputTag;
 
+    private transient ListState<Map<Integer, Integer>> sumByRoundsState;
+
+    private transient ListState<Integer> cachedRecordsState;
+
     public ReduceAllRoundProcessFunction(boolean sync, int maxRound) {
         this.sync = sync;
         this.maxRound = maxRound;
     }
 
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void initializeState(FunctionInitializationContext functionInitializationContext)
+            throws Exception {
         this.sumByEpochs = new HashMap<>();
         cachedRecords = new ArrayList<>();

Review comment:
       I'll remove all `this.`

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -224,4 +261,28 @@ public boolean isTerminated() {
             return totalRecord == 0 || (hasCriteriaStream && totalCriteriaRecord == 0);
         }
     }
+
+    private static class CheckpointStatus {
+
+        private final long totalHeadParallelism;
+
+        private final List<CompletableFuture<byte[]>> stateFutures = new ArrayList<>();
+
+        private int notifiedCoordinatorParallelism;

Review comment:
       I think `int` should be enough for representing the parallelism ? 

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {

Review comment:
       I'll use registerAlignedListener

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {
         runInEventLoop(
-                () -> this.alignedConsumers.put(operatorID, alignedConsumer),
-                "Register consumer %s",
+                () -> this.listeners.put(operatorID, alignedConsumer),
+                "Register listeners %s",
                 operatorID.toHexString());
     }
 
     public void unregisterConsumer(OperatorID operatorID) {
         synchronized (this) {

Review comment:
       The synchronized should not required in both places since now we put all the operations into the same thread. I'll remove the synchronized here.

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
##########
@@ -53,13 +53,14 @@ public void processElement(StreamRecord<T> streamRecord) throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        if (insertMaxEpochWatermark) {
-            reusable.replace(
-                    IterationRecord.newEpochWatermark(
-                            Integer.MAX_VALUE,
-                            OperatorUtils.getUniqueSenderId(
-                                    getOperatorID(), getRuntimeContext().getIndexOfThisSubtask())));
-            output.collect(reusable);
-        }
+        //        if (insertMaxEpochWatermark) {

Review comment:
       I'll remove this code~

##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
##########
@@ -49,16 +59,53 @@
 
     private transient OutputTag<OutputRecord<Integer>> outputTag;
 
+    private transient ListState<Map<Integer, Integer>> sumByRoundsState;
+
+    private transient ListState<Integer> cachedRecordsState;
+
     public ReduceAllRoundProcessFunction(boolean sync, int maxRound) {
         this.sync = sync;
         this.maxRound = maxRound;
     }
 
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void initializeState(FunctionInitializationContext functionInitializationContext)
+            throws Exception {
         this.sumByEpochs = new HashMap<>();
         cachedRecords = new ArrayList<>();

Review comment:
       I'll remove all `this.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r726039828



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/HeadOperator.java
##########
@@ -129,12 +168,89 @@ public void setup(
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
 
+        parallelismState =
+                context.getOperatorStateStore()
+                        .getUnionListState(
+                                new ListStateDescriptor<>("parallelism", IntSerializer.INSTANCE));
+        OperatorStateUtils.getUniqueElement(parallelismState, "parallelism")
+                .ifPresent(
+                        oldParallelism ->
+                                checkState(
+                                        oldParallelism
+                                                == getRuntimeContext()
+                                                        .getNumberOfParallelSubtasks(),
+                                        "The head operator is recovered with parallelism changed from "
+                                                + oldParallelism
+                                                + " to "
+                                                + getRuntimeContext()
+                                                        .getNumberOfParallelSubtasks()));
+
+        // Initialize the status and the record processor.
         processorContext = new ContextImpl();
-        status = HeadOperatorStatus.RUNNING;
-        recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
+        statusState =
+                context.getOperatorStateStore()
+                        .getListState(new ListStateDescriptor<>("status", Integer.class));
+        status =
+                HeadOperatorStatus.values()[
+                        OperatorStateUtils.getUniqueElement(statusState, "status").orElse(0)];
+        if (status == HeadOperatorStatus.RUNNING) {
+            recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
+        } else {
+            recordProcessor = new TerminatingHeadOperatorRecordProcessor();

Review comment:
       In which case would a `HeadOperator` be terminated when it has not finished initialization?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii closed pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #17:
URL: https://github.com/apache/flink-ml/pull/17


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r731780041



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/HeadOperator.java
##########
@@ -129,12 +168,89 @@ public void setup(
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
 
+        parallelismState =
+                context.getOperatorStateStore()
+                        .getUnionListState(
+                                new ListStateDescriptor<>("parallelism", IntSerializer.INSTANCE));
+        OperatorStateUtils.getUniqueElement(parallelismState, "parallelism")
+                .ifPresent(
+                        oldParallelism ->
+                                checkState(
+                                        oldParallelism
+                                                == getRuntimeContext()
+                                                        .getNumberOfParallelSubtasks(),
+                                        "The head operator is recovered with parallelism changed from "
+                                                + oldParallelism
+                                                + " to "
+                                                + getRuntimeContext()
+                                                        .getNumberOfParallelSubtasks()));
+
+        // Initialize the status and the record processor.
         processorContext = new ContextImpl();
-        status = HeadOperatorStatus.RUNNING;
-        recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
+        statusState =
+                context.getOperatorStateStore()
+                        .getListState(new ListStateDescriptor<>("status", Integer.class));
+        status =
+                HeadOperatorStatus.values()[
+                        OperatorStateUtils.getUniqueElement(statusState, "status").orElse(0)];
+        if (status == HeadOperatorStatus.RUNNING) {
+            recordProcessor = new RegularHeadOperatorRecordProcessor(processorContext);
+        } else {
+            recordProcessor = new TerminatingHeadOperatorRecordProcessor();

Review comment:
       If it is previously finished and then taking a checkpoint, but after that failover happen and it restarted from the latest checkpoint.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741862947



##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
##########
@@ -49,16 +59,53 @@
 
     private transient OutputTag<OutputRecord<Integer>> outputTag;
 
+    private transient ListState<Map<Integer, Integer>> sumByRoundsState;
+
+    private transient ListState<Integer> cachedRecordsState;
+
     public ReduceAllRoundProcessFunction(boolean sync, int maxRound) {
         this.sync = sync;
         this.maxRound = maxRound;
     }
 
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void initializeState(FunctionInitializationContext functionInitializationContext)
+            throws Exception {
         this.sumByEpochs = new HashMap<>();
         cachedRecords = new ArrayList<>();

Review comment:
       I'll remove all `this.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739974909



##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/BoundedAllRoundCheckpointTest.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.itcases.operators.FailingMap;
+import org.apache.flink.iteration.itcases.operators.OutputRecord;
+import org.apache.flink.iteration.itcases.operators.SequenceSource;
+import org.apache.flink.iteration.itcases.operators.TwoInputReduceAllRoundProcessFunction;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.testutils.junit.SharedReference;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.iteration.itcases.UnboundedStreamIterationITCase.createMiniClusterConfiguration;
+import static org.junit.Assert.assertEquals;
+
+/** Tests checkpoints. */
+@RunWith(Parameterized.class)
+public class BoundedAllRoundCheckpointTest {

Review comment:
       Please extend `TestLogger`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r739975206



##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/FailingMap.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.itcases.operators;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+
+/** Map Function triggers failover at the first runn. */

Review comment:
       first task and first round?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r737186361



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -224,4 +261,28 @@ public boolean isTerminated() {
             return totalRecord == 0 || (hasCriteriaStream && totalCriteriaRecord == 0);
         }
     }
+
+    private static class CheckpointStatus {
+
+        private final long totalHeadParallelism;
+
+        private final List<CompletableFuture<byte[]>> stateFutures = new ArrayList<>();
+
+        private int notifiedCoordinatorParallelism;

Review comment:
       int -> long?

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {

Review comment:
       registerAlignedConsumer or registerAlignedListener?

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java
##########
@@ -45,14 +49,15 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.OutputTag;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
 import java.util.Objects;
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * The head operators unions the initialized variable stream and the feedback stream, and

Review comment:
       operators -> operator?

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -31,20 +32,22 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * The progress aligner shared between multiple {@link HeadOperatorCoordinator}. It maintains the
  * information for each round, once one round is aligned, it would notify all the register
- * consumers.
+ * listenerss.

Review comment:
       listeners?

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {
         runInEventLoop(
-                () -> this.alignedConsumers.put(operatorID, alignedConsumer),
-                "Register consumer %s",
+                () -> this.listeners.put(operatorID, alignedConsumer),
+                "Register listeners %s",
                 operatorID.toHexString());
     }
 
     public void unregisterConsumer(OperatorID operatorID) {
         synchronized (this) {

Review comment:
       Why does `unregisterConsumer` need `synchronized` but `registerAlignedConsumer` does not need?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] guoweiM commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r740039154



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/ReplayOperator.java
##########
@@ -139,6 +142,22 @@ public void processElement(StreamRecord<IterationRecord<T>> element) throws Exce
         }
     }
 
+    @Override
+    public void processElement2(StreamRecord<IterationRecord<Void>> element) throws Exception {
+        if (element.getValue().getType() == IterationRecord.Type.EPOCH_WATERMARK) {
+            progressTracker.onEpochWatermark(
+                    1, element.getValue().getSender(), element.getValue().getEpoch());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Not supported element type: " + element.getValue());
+        }
+    }
+
+    @Override
+    public void endInput(int i) throws Exception {
+        progressTracker.finish(i - 1);

Review comment:
       Would you mind add some comments for this? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741842056



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -224,4 +261,28 @@ public boolean isTerminated() {
             return totalRecord == 0 || (hasCriteriaStream && totalCriteriaRecord == 0);
         }
     }
+
+    private static class CheckpointStatus {
+
+        private final long totalHeadParallelism;
+
+        private final List<CompletableFuture<byte[]>> stateFutures = new ArrayList<>();
+
+        private int notifiedCoordinatorParallelism;

Review comment:
       I think `int` should be enough for representing the parallelism ? 

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {

Review comment:
       I'll use registerAlignedListener

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {
         runInEventLoop(
-                () -> this.alignedConsumers.put(operatorID, alignedConsumer),
-                "Register consumer %s",
+                () -> this.listeners.put(operatorID, alignedConsumer),
+                "Register listeners %s",
                 operatorID.toHexString());
     }
 
     public void unregisterConsumer(OperatorID operatorID) {
         synchronized (this) {

Review comment:
       The synchronized should not required in both places since now we put all the operations into the same thread. I'll remove the synchronized here.

##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/InputOperator.java
##########
@@ -53,13 +53,14 @@ public void processElement(StreamRecord<T> streamRecord) throws Exception {
 
     @Override
     public void endInput() throws Exception {
-        if (insertMaxEpochWatermark) {
-            reusable.replace(
-                    IterationRecord.newEpochWatermark(
-                            Integer.MAX_VALUE,
-                            OperatorUtils.getUniqueSenderId(
-                                    getOperatorID(), getRuntimeContext().getIndexOfThisSubtask())));
-            output.collect(reusable);
-        }
+        //        if (insertMaxEpochWatermark) {

Review comment:
       I'll remove this code~

##########
File path: flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java
##########
@@ -49,16 +59,53 @@
 
     private transient OutputTag<OutputRecord<Integer>> outputTag;
 
+    private transient ListState<Map<Integer, Integer>> sumByRoundsState;
+
+    private transient ListState<Integer> cachedRecordsState;
+
     public ReduceAllRoundProcessFunction(boolean sync, int maxRound) {
         this.sync = sync;
         this.maxRound = maxRound;
     }
 
     @Override
-    public void open(Configuration parameters) throws Exception {
-        super.open(parameters);
+    public void initializeState(FunctionInitializationContext functionInitializationContext)
+            throws Exception {
         this.sumByEpochs = new HashMap<>();
         cachedRecords = new ArrayList<>();

Review comment:
       I'll remove all `this.`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r741845839



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/coordinator/SharedProgressAligner.java
##########
@@ -93,27 +98,28 @@ private SharedProgressAligner(
         this.executor = Objects.requireNonNull(executor);
 
         this.statusByEpoch = new HashMap<>();
-        this.alignedConsumers = new HashMap<>();
+        this.listeners = new HashMap<>();
+        this.checkpointStatuses = new HashMap<>();
     }
 
     public void registerAlignedConsumer(
-            OperatorID operatorID, Consumer<GloballyAlignedEvent> alignedConsumer) {
+            OperatorID operatorID, SharedProgressAlignerListener alignedConsumer) {
         runInEventLoop(
-                () -> this.alignedConsumers.put(operatorID, alignedConsumer),
-                "Register consumer %s",
+                () -> this.listeners.put(operatorID, alignedConsumer),
+                "Register listeners %s",
                 operatorID.toHexString());
     }
 
     public void unregisterConsumer(OperatorID operatorID) {
         synchronized (this) {

Review comment:
       The synchronized should not required in both places since now we put all the operations into the same thread. I'll remove the synchronized here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii closed pull request #17: [FLINK-24655][iteration] Support the checkpoints for the iteration

Posted by GitBox <gi...@apache.org>.
gaoyunhaii closed pull request #17:
URL: https://github.com/apache/flink-ml/pull/17






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] gaoyunhaii commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r731534576



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/HeadOperator.java
##########
@@ -129,12 +168,89 @@ public void setup(
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
 
+        parallelismState =
+                context.getOperatorStateStore()
+                        .getUnionListState(
+                                new ListStateDescriptor<>("parallelism", IntSerializer.INSTANCE));
+        OperatorStateUtils.getUniqueElement(parallelismState, "parallelism")

Review comment:
       For operator state we only have value state 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r726855274



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/headprocessor/RegularHeadOperatorRecordProcessor.java
##########
@@ -83,44 +107,88 @@ public boolean processFeedbackElement(StreamRecord<IterationRecord<?>> element)
     @Override
     public boolean onGloballyAligned(GloballyAlignedEvent globallyAlignedEvent) {
         LOG.info("Received global event {}", globallyAlignedEvent);
-
-        reusable.replace(
-                IterationRecord.newEpochWatermark(
-                        globallyAlignedEvent.isTerminated()
-                                ? Integer.MAX_VALUE
-                                : globallyAlignedEvent.getRound(),
-                        senderId),
-                0);
-        headOperatorContext.broadcastOutput(reusable);
+        checkState(
+                (globallyAlignedEvent.getRound() == 0 && latestRoundGloballyAligned == 0)
+                        || globallyAlignedEvent.getRound() > latestRoundGloballyAligned,
+                String.format(
+                        "Receive unexpected global aligned event, latest = %d, this one = %d",
+                        latestRoundGloballyAligned, globallyAlignedEvent.getRound()));
+
+        StreamRecord<IterationRecord<?>> record =
+                new StreamRecord<>(

Review comment:
       Why do we choose to create a new instance each time instead fo reusing one instance?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #17: [FLINK-10][iteration] Support the checkpoints for the iteration.

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on a change in pull request #17:
URL: https://github.com/apache/flink-ml/pull/17#discussion_r726834717



##########
File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/operator/HeadOperator.java
##########
@@ -129,12 +168,89 @@ public void setup(
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
 
+        parallelismState =
+                context.getOperatorStateStore()
+                        .getUnionListState(
+                                new ListStateDescriptor<>("parallelism", IntSerializer.INSTANCE));
+        OperatorStateUtils.getUniqueElement(parallelismState, "parallelism")

Review comment:
       Personally speaking, a `ValueState` might be better for `parallelismState` than `ListState` if the list cannot have more than one element.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org