You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 09:19:17 UTC

[GitHub] [flink] Sxnan commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream

Sxnan commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r925262604


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -21,22 +21,26 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.ToIntFunction;
 
 /** Container for meta-data of a data set. */
-public final class DataSetMetaInfo {
+public final class DataSetMetaInfo implements Serializable {
     private static final int UNKNOWN = -1;
 
     private final int numRegisteredPartitions;
     private final int numTotalPartitions;
     private final SortedMap<ResultPartitionID, ShuffleDescriptor>
             shuffleDescriptorsOrderByPartitionId =
                     new TreeMap<>(
-                            Comparator.comparingInt(o -> o.getPartitionId().getPartitionNumber()));
+                            Comparator.comparingInt(

Review Comment:
   This change is to specify that the lambda function is serializable so that the DataSetMetaInfo as a whole is serializable.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final IntermediateDataSetID intermediateDataSetID;
+    private boolean isCached;
+    /**
+     * Creates a new {@code Transformation} with the given name, output type and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     */
+    public CacheTransformation(
+            Transformation<T> transformationToCache,
+            String name,
+            TypeInformation<T> outputType,
+            int parallelism) {
+        super(name, outputType, parallelism);
+        this.transformationToCache = transformationToCache;
+
+        this.intermediateDataSetID = new IntermediateDataSetID();
+        this.isCached = false;
+    }
+
+    @Override
+    public List<Transformation<?>> getTransitivePredecessors() {
+        List<Transformation<?>> result = Lists.newArrayList();
+        if (isCached) {
+            return result;
+        }
+        result.add(this);

Review Comment:
   If I am not missing something, I think `this` should always be added if the cached is not produced yet.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CachedDataStream.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+
+/**
+ * {@link CachedDataStream} represents a {@link DataStream} whose intermediate result will be cached
+ * at the first time when it is computed. And the cached intermediate result can be used in later
+ * job that using the same {@link CachedDataStream} to avoid re-computing the intermediate result.
+ *
+ * @param <T> The type of the elements in this stream.
+ */
+@PublicEvolving
+public class CachedDataStream<T> extends DataStream<T> implements AutoCloseable {
+    /**
+     * Create a new {@link CachedDataStream} in the given execution environment that wrap the given
+     * physical transformation to indicates that the transformation should be cached.
+     *
+     * @param environment The StreamExecutionEnvironment
+     * @param transformation The physical transformation whose intermediate result should be cached.
+     */
+    public CachedDataStream(
+            StreamExecutionEnvironment environment, Transformation<T> transformation) {
+        super(
+                environment,
+                new CacheTransformation<>(
+                        transformation,
+                        String.format("Cache: %s", transformation.getName()),
+                        transformation.getOutputType(),
+                        transformation.getParallelism()));
+
+        final CacheTransformation<T> t = (CacheTransformation<T>) this.getTransformation();
+        environment.addCache(t.getIntermediateDataSetID(), t);
+    }
+
+    /**
+     * Invalidate the cache intermediate result of this DataStream to release the physical
+     * resources. Users are not required to invoke this method to release physical resources unless
+     * they want to. The CachedDataStream should not be used after it is closed.
+     */
+    @Override
+    public void close() throws Exception {

Review Comment:
   You are correct that users are less likely to use the try-with-resource with the CachedDataStream. The reason for using `close` instead of `invalidate` or `unpersist` is that `close` implies that the object should not be used after close, while the `invalidate` and `unpersist` don't have that implication.
   
   With that said, I am a bit leaning toward using `close` and not extends `AutoCloseable`. What do you think?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org