You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/26 23:56:24 UTC

[GitHub] [flink] afedulov opened a new pull request, #19588: [FLINK-XXXX] [core] Add Generator source

afedulov opened a new pull request, #19588:
URL: https://github.com/apache/flink/pull/19588

   Draft PR for early feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19588:
URL: https://github.com/apache/flink/pull/19588#discussion_r863717974


##########
flink-core/src/main/java/org/apache/flink/util/NewSplittableIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for iterators that can split themselves into multiple disjoint iterators. The
+ * union of these iterators returns the original iterator values.
+ *
+ * @param <T> The type of elements returned by the iterator.
+ */
+@Public
+public abstract class NewSplittableIterator<T> implements Iterator<T>, Serializable {

Review Comment:
   This is just to discuss deprecation, not intended to be called like that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19588:
URL: https://github.com/apache/flink/pull/19588#discussion_r863672111


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Public
+public class GeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit<OUT>, Collection<NumberSequenceSplit<OUT>>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The end number in the sequence, inclusive. */
+    private final long count;
+
+    private final TypeInformation<OUT> typeInfo;
+
+    public final MapFunction<Long, OUT> mapFunction;
+
+    public final Boundedness boundedness;
+
+    public long getCount() {
+        return count;
+    }
+
+    private GeneratorSource(
+            MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) {
+        this.mapFunction = checkNotNull(mapFunction);
+        checkArgument(count > 0, "count must be > 0");
+        this.count = count;
+        this.typeInfo = typeInfo;
+        boundedness =
+                count == Long.MAX_VALUE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    public static <OUT> GeneratorSource<OUT> from(
+            MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) {

Review Comment:
   I would not use `MapFunction` here and rather the normal java `Function`



##########
flink-core/src/main/java/org/apache/flink/util/NewSplittableIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for iterators that can split themselves into multiple disjoint iterators. The
+ * union of these iterators returns the original iterator values.
+ *
+ * @param <T> The type of elements returned by the iterator.
+ */
+@Public
+public abstract class NewSplittableIterator<T> implements Iterator<T>, Serializable {

Review Comment:
   The name is not perfect ;)



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSource.java:
##########
@@ -46,25 +46,10 @@
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-/**
- * A data source that produces a sequence of numbers (longs). This source is useful for testing and
- * for cases that just need a stream of N events of any kind.
- *
- * <p>The source splits the sequence into as many parallel sub-sequences as there are parallel
- * source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
- * limited to one, this will produce one sequence in order.
- *
- * <p>This source is always bounded. For very long sequences (for example over the entire domain of
- * long integer values), user may want to consider executing the application in a streaming manner,
- * because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
- */
 @Public
-public class NumberSequenceSource
-        implements Source<
-                        Long,
-                        NumberSequenceSource.NumberSequenceSplit,
-                        Collection<NumberSequenceSource.NumberSequenceSplit>>,
-                ResultTypeQueryable<Long> {
+public class GeneratorSource<OUT>

Review Comment:
   Just FYI: you need to keep the `NumberSequenceSource` because it is `Public` and can only be changed in a breaking way with a new major release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19588:
URL: https://github.com/apache/flink/pull/19588#issuecomment-1110359299

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e72d31992721dc5753b3e6ccb53aba7a948e2893",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e72d31992721dc5753b3e6ccb53aba7a948e2893",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e72d31992721dc5753b3e6ccb53aba7a948e2893 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19588:
URL: https://github.com/apache/flink/pull/19588#discussion_r863724935


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Public
+public class GeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit<OUT>, Collection<NumberSequenceSplit<OUT>>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The end number in the sequence, inclusive. */
+    private final long count;
+
+    private final TypeInformation<OUT> typeInfo;
+
+    public final MapFunction<Long, OUT> mapFunction;
+
+    public final Boundedness boundedness;
+
+    public long getCount() {
+        return count;
+    }
+
+    private GeneratorSource(
+            MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) {
+        this.mapFunction = checkNotNull(mapFunction);
+        checkArgument(count > 0, "count must be > 0");
+        this.count = count;
+        this.typeInfo = typeInfo;
+        boundedness =
+                count == Long.MAX_VALUE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    public static <OUT> GeneratorSource<OUT> from(
+            MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) {

Review Comment:
   There is also a `SerializableFunction` in Flink. I think that is a better fit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19588:
URL: https://github.com/apache/flink/pull/19588#discussion_r863717974


##########
flink-core/src/main/java/org/apache/flink/util/NewSplittableIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for iterators that can split themselves into multiple disjoint iterators. The
+ * union of these iterators returns the original iterator values.
+ *
+ * @param <T> The type of elements returned by the iterator.
+ */
+@Public
+public abstract class NewSplittableIterator<T> implements Iterator<T>, Serializable {

Review Comment:
   This is just to discuss if we want to deprecate the "old" one, not intended to be called like that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol closed pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
zentol closed pull request #19588: [FLINK-XXXX] [core] Add Generator source
URL: https://github.com/apache/flink/pull/19588


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] afedulov commented on a diff in pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
afedulov commented on code in PR #19588:
URL: https://github.com/apache/flink/pull/19588#discussion_r863717635


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorSource.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.util.InstantiationUtil.serializeObject;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@Public
+public class GeneratorSource<OUT>
+        implements Source<OUT, NumberSequenceSplit<OUT>, Collection<NumberSequenceSplit<OUT>>>,
+                ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The end number in the sequence, inclusive. */
+    private final long count;
+
+    private final TypeInformation<OUT> typeInfo;
+
+    public final MapFunction<Long, OUT> mapFunction;
+
+    public final Boundedness boundedness;
+
+    public long getCount() {
+        return count;
+    }
+
+    private GeneratorSource(
+            MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) {
+        this.mapFunction = checkNotNull(mapFunction);
+        checkArgument(count > 0, "count must be > 0");
+        this.count = count;
+        this.typeInfo = typeInfo;
+        boundedness =
+                count == Long.MAX_VALUE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    public static <OUT> GeneratorSource<OUT> from(
+            MapFunction<Long, OUT> mapFunction, long count, TypeInformation<OUT> typeInfo) {

Review Comment:
   It was Arvid's proposition, since it has to be serializable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fapaul commented on a diff in pull request #19588: [FLINK-XXXX] [core] Add Generator source

Posted by GitBox <gi...@apache.org>.
fapaul commented on code in PR #19588:
URL: https://github.com/apache/flink/pull/19588#discussion_r863728326


##########
flink-core/src/main/java/org/apache/flink/util/NewSplittableIterator.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for iterators that can split themselves into multiple disjoint iterators. The
+ * union of these iterators returns the original iterator values.
+ *
+ * @param <T> The type of elements returned by the iterator.
+ */
+@Public
+public abstract class NewSplittableIterator<T> implements Iterator<T>, Serializable {

Review Comment:
   Is the only proposed change to adjust the signature of `split` from arrays to lists? Can you give a quick explanation about the motivation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org