You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/20 07:01:21 UTC

[GitHub] [flink] gaoyunhaii commented on a diff in pull request #20147: [FLINK-27524][datastream] Introduce cache API to DataStream

gaoyunhaii commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r925041477


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final IntermediateDataSetID intermediateDataSetID;
+    private boolean isCached;
+    /**
+     * Creates a new {@code Transformation} with the given name, output type and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     */
+    public CacheTransformation(
+            Transformation<T> transformationToCache,
+            String name,
+            TypeInformation<T> outputType,

Review Comment:
   Does the `CacheTransformation` always has the same outputType and parallelism with the given one? If so we might not allow users to reset the two parameters?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>

Review Comment:
   nit: complete the type param comment or remove this line.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java:
##########
@@ -21,22 +21,26 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import java.io.Serializable;
 import java.util.Comparator;
 import java.util.Map;
 import java.util.OptionalInt;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.ToIntFunction;
 
 /** Container for meta-data of a data set. */
-public final class DataSetMetaInfo {
+public final class DataSetMetaInfo implements Serializable {
     private static final int UNKNOWN = -1;
 
     private final int numRegisteredPartitions;
     private final int numTotalPartitions;
     private final SortedMap<ResultPartitionID, ShuffleDescriptor>
             shuffleDescriptorsOrderByPartitionId =
                     new TreeMap<>(
-                            Comparator.comparingInt(o -> o.getPartitionId().getPartitionNumber()));
+                            Comparator.comparingInt(

Review Comment:
   It seems this change not necessary ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -2203,13 +2199,21 @@ public StreamGraph getStreamGraph() {
      */
     @Internal
     public StreamGraph getStreamGraph(boolean clearTransformations) {
+        updateCacheTransformation();

Review Comment:
   It should be not right to mark the transformations as cached here. This method might also be called if users want to get plans. 
   
   The update might happen at least after the job returns result in the `execute` method. The detach mode should cause errors.
   
   Also I'm a bit hesitate in whether to check the result of the job before setting the flag.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final IntermediateDataSetID intermediateDataSetID;

Review Comment:
   I'm a bit concern in introducing the `IntermediateDataSetID` into the datastream API layer. The meaning of IntermediateDataSetID couples too tightly with the implementations. We might use an AbstractID here ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final IntermediateDataSetID intermediateDataSetID;
+    private boolean isCached;
+    /**
+     * Creates a new {@code Transformation} with the given name, output type and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     */
+    public CacheTransformation(
+            Transformation<T> transformationToCache,
+            String name,
+            TypeInformation<T> outputType,
+            int parallelism) {
+        super(name, outputType, parallelism);
+        this.transformationToCache = transformationToCache;
+
+        this.intermediateDataSetID = new IntermediateDataSetID();
+        this.isCached = false;
+    }
+
+    @Override
+    public List<Transformation<?>> getTransitivePredecessors() {
+        List<Transformation<?>> result = Lists.newArrayList();
+        if (isCached) {
+            return result;
+        }
+        result.add(this);

Review Comment:
   Should `this` always be added ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CachedDataStream.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+
+/**
+ * {@link CachedDataStream} represents a {@link DataStream} whose intermediate result will be cached
+ * at the first time when it is computed. And the cached intermediate result can be used in later
+ * job that using the same {@link CachedDataStream} to avoid re-computing the intermediate result.
+ *
+ * @param <T> The type of the elements in this stream.
+ */
+@PublicEvolving
+public class CachedDataStream<T> extends DataStream<T> implements AutoCloseable {
+    /**
+     * Create a new {@link CachedDataStream} in the given execution environment that wrap the given
+     * physical transformation to indicates that the transformation should be cached.
+     *
+     * @param environment The StreamExecutionEnvironment
+     * @param transformation The physical transformation whose intermediate result should be cached.
+     */
+    public CachedDataStream(
+            StreamExecutionEnvironment environment, Transformation<T> transformation) {
+        super(
+                environment,
+                new CacheTransformation<>(
+                        transformation,
+                        String.format("Cache: %s", transformation.getName()),
+                        transformation.getOutputType(),
+                        transformation.getParallelism()));
+
+        final CacheTransformation<T> t = (CacheTransformation<T>) this.getTransformation();
+        environment.addCache(t.getIntermediateDataSetID(), t);
+    }
+
+    /**
+     * Invalidate the cache intermediate result of this DataStream to release the physical
+     * resources. Users are not required to invoke this method to release physical resources unless
+     * they want to. The CachedDataStream should not be used after it is closed.
+     */
+    @Override
+    public void close() throws Exception {

Review Comment:
   I'm a bit tend to we use a dedicated name, like `invalidate` or `unpersist` to invalidate the cache and not extends `AutoClose`. Users usually create the cached stream via `a.map().cache()`, and it should be seldom to use it with try with resource.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/CacheTransformationTranslator.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.translators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/** Translator for {@link CacheTransformationTranslator}. */
+@Internal
+public class CacheTransformationTranslator<OUT, T extends CacheTransformation<OUT>>
+        extends SimpleTransformationTranslator<OUT, T> {
+
+    public static final String CACHE_CONSUMER_OPERATOR_NAME = "CacheRead";
+    public static final String CACHE_PRODUCER_OPERATOR_NAME = "CacheWrite";
+
+    @Override
+    protected Collection<Integer> translateForBatchInternal(T transformation, Context context) {
+        if (!transformation.isCached()) {
+            final List<Transformation<?>> inputs = transformation.getInputs();
+            Preconditions.checkState(
+                    inputs.size() == 1, "There could be only one transformation input to cache");
+            Transformation<?> input = inputs.get(0);
+            if (input instanceof PhysicalTransformation) {
+                return physicalTransformationProduceCache(transformation, context, input);
+            } else if (input instanceof SideOutputTransformation) {
+                return sideOutputTransformationProduceCache(
+                        transformation, context, (SideOutputTransformation<?>) input);
+            } else {
+                throw new RuntimeException(
+                        String.format("Unsupported transformation %s", input.getClass()));
+            }
+        } else {
+            return consumeCache(transformation, context);
+        }
+    }
+
+    @Override
+    protected Collection<Integer> translateForStreamingInternal(T transformation, Context context) {
+        if (transformation.isCached()) {
+            return consumeCache(transformation, context);
+        } else {
+            // do nothing and return the node id of the input node

Review Comment:
   I tend to we explicitly throw an exception here to notify users currently cache is not supported in streaming mode. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java:
##########
@@ -73,6 +74,9 @@ public class StreamEdge implements Serializable {
 
     private boolean supportsUnalignedCheckpoints = true;
 
+    private final IntermediateDataSetID intermediateDatasetId;

Review Comment:
   I think we could distinguish if cache is required from whether the give cache id is null, then we could omit the `shouldCache` parameters all the way. 
   
   Also we might name it to something like `id of cache to produce` to distinguish between `id of cache to consume` ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java:
##########
@@ -437,4 +438,23 @@ public SingleOutputStreamOperator<T> setDescription(String description) {
         transformation.setDescription(description);
         return this;
     }
+
+    /**
+     * Cache the intermediate result of the transformation. Only job running in batch mode with

Review Comment:
   For the long run, I think the cache might be able to support bounded streams for both blocking and streaming mode. I'm a bit tend we change the description to something like "only support bounded streams, currently only blocking mode is supported". 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -2203,13 +2199,21 @@ public StreamGraph getStreamGraph() {
      */
     @Internal
     public StreamGraph getStreamGraph(boolean clearTransformations) {
+        updateCacheTransformation();
         final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
         if (clearTransformations) {
             transformations.clear();
         }
         return streamGraph;
     }
 
+    private void updateCacheTransformation() {
+        Set<AbstractID> cachedIntermediateDatasetIds = listCachedIntermediateDatasetIds();

Review Comment:
   I'm also a bit concern for acquiring the list of cached results from the RM side if we change the call position of this method to be after a job finished. 
   
   Logically if a job succeeds, all the involved transformations should be cached successfully, otherwise there should be unexpected behaviors happens. Thus I tend to that the client maintains the set of cached datasets by itself. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T>
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final IntermediateDataSetID intermediateDataSetID;
+    private boolean isCached;
+    /**
+     * Creates a new {@code Transformation} with the given name, output type and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown in Visualizations and
+     *     the Log
+     * @param outputType The output type of this {@code Transformation}
+     * @param parallelism The parallelism of this {@code Transformation}
+     */
+    public CacheTransformation(
+            Transformation<T> transformationToCache,
+            String name,
+            TypeInformation<T> outputType,
+            int parallelism) {
+        super(name, outputType, parallelism);
+        this.transformationToCache = transformationToCache;
+
+        this.intermediateDataSetID = new IntermediateDataSetID();
+        this.isCached = false;
+    }
+
+    @Override
+    public List<Transformation<?>> getTransitivePredecessors() {
+        List<Transformation<?>> result = Lists.newArrayList();
+        if (isCached) {
+            return result;
+        }
+        result.add(this);
+        result.addAll(transformationToCache.getTransitivePredecessors());
+        return result;
+    }
+
+    @Override
+    public List<Transformation<?>> getInputs() {
+        if (isCached) {
+            return Collections.emptyList();
+        }
+        return Collections.singletonList(transformationToCache);
+    }
+
+    public IntermediateDataSetID getIntermediateDataSetID() {
+        return intermediateDataSetID;
+    }
+
+    public Transformation<T> getTransformationToCache() {
+        return transformationToCache;
+    }
+
+    public void setCached(boolean cached) {

Review Comment:
   Might change to `markCached()` since logically we should only set it from false to true. 



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -2591,4 +2595,70 @@ private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
     public List<Transformation<?>> getTransformations() {
         return transformations;
     }
+
+    @Internal
+    public <T> void addCache(
+            IntermediateDataSetID intermediateDataSetID, CacheTransformation<T> t) {
+        cachedTransformation.put(intermediateDataSetID, t);
+    }
+
+    @Internal
+    public void invalidateCache(IntermediateDataSetID intermediateDataSetId) throws Exception {
+        if (!cachedTransformation.containsKey(intermediateDataSetId)) {
+            return;

Review Comment:
   Should we throw exceptions here ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -177,6 +182,9 @@ public class StreamExecutionEnvironment {
 
     protected final List<Transformation<?>> transformations = new ArrayList<>();
 
+    private final Map<IntermediateDataSetID, CacheTransformation<?>> cachedTransformation =

Review Comment:
   nit: `cachedTransformation` -> `cachedTransformations`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org