You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/12 08:59:44 UTC

[GitHub] [flink] tigrulya-exe opened a new pull request, #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for treamExecutionEnvironment#fromCollection() methods

tigrulya-exe opened a new pull request, #21028:
URL: https://github.com/apache/flink/pull/21028

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This pull request adds FLIP-27 Source API alternatives for `FromElementsFunction` and `FromIteratorFunction`.
   
   ## Brief change log
   
     - Added CollectionSource
     - Added IteratorSource
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added tests for both sources, validating elements emission, snapshot and recovery from snapshot logic.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (yes)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #21028:
URL: https://github.com/apache/flink/pull/21028#discussion_r993334383


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/source/CollectionSource.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.util.serialization.CollectionSerializer;
+import org.apache.flink.streaming.util.serialization.SimpleObjectSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * A {@link Source} implementation that reads data from a collection.
+ *
+ * <p>This source serializes the elements using Flink's type information. That way, any object
+ * transport using Java serialization will not be affected by the serializability of the elements.
+ *
+ * <p>Note: Parallelism of this source must be 1.
+ */
+@PublicEvolving

Review Comment:
   This doesn't need to be public(evolving). In fact none of these classes here have to be.
   We can keep these as internal implementation details of the corresponding methods in the `StreamExEnv`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/source/IteratorSource.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.util.serialization.CollectionSerializer;
+import org.apache.flink.streaming.util.serialization.SimpleObjectSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * A Source that reads elements from an {@link Iterator} and emits them.
+ *
+ * <p>Note: Parallelism of this source must be 1.
+ */
+@PublicEvolving
+public class IteratorSource<E, IterT extends Iterator<E> & Serializable>
+        implements Source<E, IteratorSplit<E, IterT>, Collection<IteratorSplit<E, IterT>>> {
+    private final IterT iterator;
+    private final Boundedness boundedness;
+
+    protected IteratorSource(IterT iterator, Boundedness boundedness) {
+        this.iterator = iterator;
+        this.boundedness = boundedness;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return boundedness;

Review Comment:
   We can assume this to be bounded imo.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/source/CollectionSource.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.util.serialization.CollectionSerializer;
+import org.apache.flink.streaming.util.serialization.SimpleObjectSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * A {@link Source} implementation that reads data from a collection.
+ *
+ * <p>This source serializes the elements using Flink's type information. That way, any object
+ * transport using Java serialization will not be affected by the serializability of the elements.
+ *
+ * <p>Note: Parallelism of this source must be 1.
+ */
+@PublicEvolving
+public class CollectionSource<E>
+        implements Source<E, SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>,
+                OutputTypeConfigurable<E> {
+    private final transient Iterable<E> elements;
+    private final int elementsCount;
+
+    private byte[] serializedElements;
+    private TypeSerializer<E> serializer;
+
+    public CollectionSource(Iterable<E> elements, TypeSerializer<E> serializer) {
+        this.elements = Preconditions.checkNotNull(elements);
+        this.serializer = Preconditions.checkNotNull(serializer);
+        this.elementsCount = Iterables.size(elements);
+        checkIterable(elements, Object.class);
+        serializeElements();
+    }
+
+    public CollectionSource(Iterable<E> elements) {
+        this.elements = Preconditions.checkNotNull(elements);
+        this.elementsCount = Iterables.size(elements);
+        checkIterable(elements, Object.class);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<E, SerializedElementsSplit<E>> createReader(
+            SourceReaderContext readerContext) throws Exception {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>
+            createEnumerator(SplitEnumeratorContext<SerializedElementsSplit<E>> enumContext)
+                    throws Exception {
+        SerializedElementsSplit<E> split =
+                new SerializedElementsSplit<>(serializedElements, elementsCount, serializer);
+        return new IteratorSourceEnumerator<>(enumContext, Collections.singletonList(split));
+    }
+
+    @Override
+    public SplitEnumerator<SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>
+            restoreEnumerator(
+                    SplitEnumeratorContext<SerializedElementsSplit<E>> enumContext,
+                    Collection<SerializedElementsSplit<E>> restoredSplits)
+                    throws Exception {
+        Preconditions.checkArgument(
+                restoredSplits.size() <= 1, "Parallelism of CollectionSource should be 1");
+        return new IteratorSourceEnumerator<>(enumContext, restoredSplits);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<SerializedElementsSplit<E>> getSplitSerializer() {
+        return new ElementsSplitSerializer<>();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<SerializedElementsSplit<E>>>
+            getEnumeratorCheckpointSerializer() {
+        return new CollectionSerializer<>(new ElementsSplitSerializer<>());
+    }
+
+    private void serializeElements() {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos)) {
+            for (E element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+            this.serializedElements = baos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Serializing the source elements failed: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void setOutputType(TypeInformation<E> outTypeInfo, ExecutionConfig executionConfig) {
+        Preconditions.checkState(
+                elements != null,

Review Comment:
   `elements` is guaranteed to not be null.
   ```suggestion
                   serializer != null,
   ```



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/source/CollectionSourceTest.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.connector.source.CollectionSource;
+import org.apache.flink.streaming.api.connector.source.SerializedElementsSplit;
+import org.apache.flink.streaming.util.SourceTestHarnessUtils;
+
+import org.assertj.core.util.Lists;
+import org.junit.Test;

Review Comment:
   new tests should use junit 5



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/source/CollectionSource.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.util.serialization.CollectionSerializer;
+import org.apache.flink.streaming.util.serialization.SimpleObjectSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * A {@link Source} implementation that reads data from a collection.
+ *
+ * <p>This source serializes the elements using Flink's type information. That way, any object
+ * transport using Java serialization will not be affected by the serializability of the elements.
+ *
+ * <p>Note: Parallelism of this source must be 1.
+ */
+@PublicEvolving
+public class CollectionSource<E>
+        implements Source<E, SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>,
+                OutputTypeConfigurable<E> {
+    private final transient Iterable<E> elements;
+    private final int elementsCount;
+
+    private byte[] serializedElements;
+    private TypeSerializer<E> serializer;
+
+    public CollectionSource(Iterable<E> elements, TypeSerializer<E> serializer) {
+        this.elements = Preconditions.checkNotNull(elements);
+        this.serializer = Preconditions.checkNotNull(serializer);
+        this.elementsCount = Iterables.size(elements);
+        checkIterable(elements, Object.class);
+        serializeElements();
+    }
+
+    public CollectionSource(Iterable<E> elements) {
+        this.elements = Preconditions.checkNotNull(elements);
+        this.elementsCount = Iterables.size(elements);
+        checkIterable(elements, Object.class);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<E, SerializedElementsSplit<E>> createReader(
+            SourceReaderContext readerContext) throws Exception {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>
+            createEnumerator(SplitEnumeratorContext<SerializedElementsSplit<E>> enumContext)
+                    throws Exception {
+        SerializedElementsSplit<E> split =
+                new SerializedElementsSplit<>(serializedElements, elementsCount, serializer);
+        return new IteratorSourceEnumerator<>(enumContext, Collections.singletonList(split));
+    }
+
+    @Override
+    public SplitEnumerator<SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>
+            restoreEnumerator(
+                    SplitEnumeratorContext<SerializedElementsSplit<E>> enumContext,
+                    Collection<SerializedElementsSplit<E>> restoredSplits)
+                    throws Exception {
+        Preconditions.checkArgument(
+                restoredSplits.size() <= 1, "Parallelism of CollectionSource should be 1");
+        return new IteratorSourceEnumerator<>(enumContext, restoredSplits);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<SerializedElementsSplit<E>> getSplitSerializer() {
+        return new ElementsSplitSerializer<>();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<SerializedElementsSplit<E>>>
+            getEnumeratorCheckpointSerializer() {
+        return new CollectionSerializer<>(new ElementsSplitSerializer<>());
+    }
+
+    private void serializeElements() {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos)) {
+            for (E element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+            this.serializedElements = baos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Serializing the source elements failed: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void setOutputType(TypeInformation<E> outTypeInfo, ExecutionConfig executionConfig) {
+        Preconditions.checkState(
+                elements != null,
+                "The output type should've been specified before shipping the graph to the cluster");
+        checkIterable(elements, outTypeInfo.getTypeClass());
+        TypeSerializer<E> newSerializer = outTypeInfo.createSerializer(executionConfig);
+        if (Objects.equals(serializer, newSerializer)) {
+            return;
+        }
+        serializer = newSerializer;
+        serializeElements();
+    }
+
+    @VisibleForTesting
+    public byte[] getSerializedElements() {

Review Comment:
   ```suggestion
       byte[] getSerializedElements() {
   ```
   It _shouldn't_ be necessary for this to be public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tigrulya-exe commented on a diff in pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

Posted by GitBox <gi...@apache.org>.
tigrulya-exe commented on code in PR #21028:
URL: https://github.com/apache/flink/pull/21028#discussion_r1023852295


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/connector/source/CollectionSource.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.util.serialization.CollectionSerializer;
+import org.apache.flink.streaming.util.serialization.SimpleObjectSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * A {@link Source} implementation that reads data from a collection.
+ *
+ * <p>This source serializes the elements using Flink's type information. That way, any object
+ * transport using Java serialization will not be affected by the serializability of the elements.
+ *
+ * <p>Note: Parallelism of this source must be 1.
+ */
+@PublicEvolving
+public class CollectionSource<E>
+        implements Source<E, SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>,
+                OutputTypeConfigurable<E> {
+    private final transient Iterable<E> elements;
+    private final int elementsCount;
+
+    private byte[] serializedElements;
+    private TypeSerializer<E> serializer;
+
+    public CollectionSource(Iterable<E> elements, TypeSerializer<E> serializer) {
+        this.elements = Preconditions.checkNotNull(elements);
+        this.serializer = Preconditions.checkNotNull(serializer);
+        this.elementsCount = Iterables.size(elements);
+        checkIterable(elements, Object.class);
+        serializeElements();
+    }
+
+    public CollectionSource(Iterable<E> elements) {
+        this.elements = Preconditions.checkNotNull(elements);
+        this.elementsCount = Iterables.size(elements);
+        checkIterable(elements, Object.class);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<E, SerializedElementsSplit<E>> createReader(
+            SourceReaderContext readerContext) throws Exception {
+        return new IteratorSourceReader<>(readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>
+            createEnumerator(SplitEnumeratorContext<SerializedElementsSplit<E>> enumContext)
+                    throws Exception {
+        SerializedElementsSplit<E> split =
+                new SerializedElementsSplit<>(serializedElements, elementsCount, serializer);
+        return new IteratorSourceEnumerator<>(enumContext, Collections.singletonList(split));
+    }
+
+    @Override
+    public SplitEnumerator<SerializedElementsSplit<E>, Collection<SerializedElementsSplit<E>>>
+            restoreEnumerator(
+                    SplitEnumeratorContext<SerializedElementsSplit<E>> enumContext,
+                    Collection<SerializedElementsSplit<E>> restoredSplits)
+                    throws Exception {
+        Preconditions.checkArgument(
+                restoredSplits.size() <= 1, "Parallelism of CollectionSource should be 1");
+        return new IteratorSourceEnumerator<>(enumContext, restoredSplits);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<SerializedElementsSplit<E>> getSplitSerializer() {
+        return new ElementsSplitSerializer<>();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<Collection<SerializedElementsSplit<E>>>
+            getEnumeratorCheckpointSerializer() {
+        return new CollectionSerializer<>(new ElementsSplitSerializer<>());
+    }
+
+    private void serializeElements() {
+        Preconditions.checkState(serializer != null, "serializer not set");
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(baos)) {
+            for (E element : elements) {
+                serializer.serialize(element, wrapper);
+            }
+            this.serializedElements = baos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Serializing the source elements failed: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void setOutputType(TypeInformation<E> outTypeInfo, ExecutionConfig executionConfig) {
+        Preconditions.checkState(
+                elements != null,

Review Comment:
   Replaced with `outTypeInfo != null` check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tigrulya-exe commented on pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

Posted by GitBox <gi...@apache.org>.
tigrulya-exe commented on PR #21028:
URL: https://github.com/apache/flink/pull/21028#issuecomment-1316825440

   Hi, @zentol ! I've resolved comments, replaced old sources in related methods of StreamExEnv with new ones and fixed all failed tests that followed this replacement. Now I'm waiting for your decision about `DataGeneratorSource`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for treamExecutionEnvironment#fromCollection() methods

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21028:
URL: https://github.com/apache/flink/pull/21028#issuecomment-1275836856

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c06b60a09b3897d82408a02daa2250c138f2565f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c06b60a09b3897d82408a02daa2250c138f2565f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c06b60a09b3897d82408a02daa2250c138f2565f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

Posted by "afedulov (via GitHub)" <gi...@apache.org>.
afedulov commented on PR #21028:
URL: https://github.com/apache/flink/pull/21028#issuecomment-1604224171

   @zentol I implemented the alternative approach you proposed based on the `DataGeneratorSource`. There are a couple of open questions, but I'd like to get your general feedback before working on them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods

Posted by "afedulov (via GitHub)" <gi...@apache.org>.
afedulov commented on PR #21028:
URL: https://github.com/apache/flink/pull/21028#issuecomment-1601564665

   @zentol One concerns regarding the idea to migrate the existing methods in `StreamExecutionEnvironment` to `DataGeneratorSource` is that adding the `flink-streaming-java` dependency on `flink-connector-datagen` introduces a somewhat unexpected cycle between `flink-architecture-tests-test` and `flink-clients`. How can we address this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods [flink]

Posted by "tigrulya-exe (via GitHub)" <gi...@apache.org>.
tigrulya-exe closed pull request #21028: [FLINK-28229][streaming-java] Introduce Source API alternatives for StreamExecutionEnvironment#fromCollection() methods
URL: https://github.com/apache/flink/pull/21028


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org