You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/11 07:22:15 UTC

[GitHub] [flink] JingsongLi opened a new pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

JingsongLi opened a new pull request #12074:
URL: https://github.com/apache/flink/pull/12074


   
   ## What is the purpose of the change
   
   Introduce built-in connectors for better startup, test, production-debug, and etc...
   datagen source:
   - easy startup/test for streaming job
   - performance testing
   
   ```
   CREATE TABLE user (
       id BIGINT,
       age INT,
       description STRING
   ) WITH (
       'connector.type' = 'datagen',
       'connector.rows-per-second'='100',
       'connector.total-records'='1000000',
   
       'schema.id.generator' = 'sequence',
       'schema.id.generator.start' = '1',
   
       'schema.age.generator' = 'random',
       'schema.age.generator.min' = '0',
       'schema.age.generator.max' = '100',
   
       'schema.description.generator' = 'random',
       'schema.description.generator.length' = '100'
   )
   -- Default is random generator.
   ```
   
   ## Brief change log
   
   Introduce:
   - DataGeneratorSource
   - DataGenTableSourceFactory
   
   ## Verifying this change
   
   - `DataGeneratorSourceTest`
   - `DataGenTableSourceFactoryTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector:no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjuwangg commented on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
zjuwangg commented on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-628492412


   Looks good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a3c8f29181ce55ff570c9d8709950ac58d635bf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #12074:
URL: https://github.com/apache/flink/pull/12074#discussion_r424893175



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * A stateful, re-scalable {@link DataGenerator} that emits each number from a given interval
+ * exactly once, possibly in parallel.
+ */
+@Experimental
+public abstract class SequenceGenerator<T> implements DataGenerator<T> {
+
+	private final long start;
+	private final long end;
+
+	private transient ListState<Long> checkpointedState;
+	protected transient Deque<Long> valuesToEmit;
+
+	/**
+	 * Creates a DataGenerator that emits all numbers from the given interval exactly once.
+	 *
+	 * @param start Start of the range of numbers to emit.
+	 * @param end End of the range of numbers to emit.
+	 */
+	public SequenceGenerator(long start, long end) {
+		this.start = start;
+		this.end = end;
+	}
+
+	@Override
+	public void open(
+			String name,
+			FunctionInitializationContext context,
+			RuntimeContext runtimeContext) throws Exception {
+		Preconditions.checkState(this.checkpointedState == null,
+				"The " + getClass().getSimpleName() + " has already been initialized.");
+
+		this.checkpointedState = context.getOperatorStateStore().getListState(
+				new ListStateDescriptor<>(
+						name + "-sequence-state",
+						LongSerializer.INSTANCE));
+		this.valuesToEmit = new ArrayDeque<>();
+		if (context.isRestored()) {
+			// upon restoring
+
+			for (Long v : this.checkpointedState.get()) {
+				this.valuesToEmit.add(v);
+			}
+		} else {
+			// the first time the job is executed
+			final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+			final int taskIdx = runtimeContext.getIndexOfThisSubtask();
+			final long congruence = start + taskIdx;
+
+			long totalNoOfElements = Math.abs(end - start + 1);
+			final int baseSize = safeDivide(totalNoOfElements, stepSize);
+			final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
+
+			for (long collected = 0; collected < toCollect; collected++) {

Review comment:
       Yes, this is the same mechanism to `StatefulSequenceSource`, it is hard to modify, we can leave this improvement to future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035",
       "triggerID" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248",
       "triggerID" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef7b6e2f25f330eb10dfa0d39f81daba85cddb58",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ef7b6e2f25f330eb10dfa0d39f81daba85cddb58",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62f441198c518458511ce8b6b013e878c3e9f156 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248) 
   * ef7b6e2f25f330eb10dfa0d39f81daba85cddb58 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a3c8f29181ce55ff570c9d8709950ac58d635bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035",
       "triggerID" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248",
       "triggerID" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef7b6e2f25f330eb10dfa0d39f81daba85cddb58",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1358",
       "triggerID" : "ef7b6e2f25f330eb10dfa0d39f81daba85cddb58",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62f441198c518458511ce8b6b013e878c3e9f156 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248) 
   * ef7b6e2f25f330eb10dfa0d39f81daba85cddb58 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1358) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a3c8f29181ce55ff570c9d8709950ac58d635bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960) 
   * 5c0324117a8a88c7a821eac0d0216a675b14962f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-628445370


   Thanks @zjuwangg for your review, updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a3c8f29181ce55ff570c9d8709950ac58d635bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960) 
   * 5c0324117a8a88c7a821eac0d0216a675b14962f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977) 
   * bdee2834ef68c942778b3e435ee00a10da769061 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035",
       "triggerID" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0fb435bee16969f02c3ebee46beee75c2b8c074a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035) 
   * 62f441198c518458511ce8b6b013e878c3e9f156 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a3c8f29181ce55ff570c9d8709950ac58d635bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960) 
   * 5c0324117a8a88c7a821eac0d0216a675b14962f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977) 
   * bdee2834ef68c942778b3e435ee00a10da769061 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035",
       "triggerID" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248",
       "triggerID" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0fb435bee16969f02c3ebee46beee75c2b8c074a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035) 
   * 62f441198c518458511ce8b6b013e878c3e9f156 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjuwangg commented on a change in pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
zjuwangg commented on a change in pull request #12074:
URL: https://github.com/apache/flink/pull/12074#discussion_r424846810



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/SequenceGenerator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * A stateful, re-scalable {@link DataGenerator} that emits each number from a given interval
+ * exactly once, possibly in parallel.
+ */
+@Experimental
+public abstract class SequenceGenerator<T> implements DataGenerator<T> {
+
+	private final long start;
+	private final long end;
+
+	private transient ListState<Long> checkpointedState;
+	protected transient Deque<Long> valuesToEmit;
+
+	/**
+	 * Creates a DataGenerator that emits all numbers from the given interval exactly once.
+	 *
+	 * @param start Start of the range of numbers to emit.
+	 * @param end End of the range of numbers to emit.
+	 */
+	public SequenceGenerator(long start, long end) {
+		this.start = start;
+		this.end = end;
+	}
+
+	@Override
+	public void open(
+			String name,
+			FunctionInitializationContext context,
+			RuntimeContext runtimeContext) throws Exception {
+		Preconditions.checkState(this.checkpointedState == null,
+				"The " + getClass().getSimpleName() + " has already been initialized.");
+
+		this.checkpointedState = context.getOperatorStateStore().getListState(
+				new ListStateDescriptor<>(
+						name + "-sequence-state",
+						LongSerializer.INSTANCE));
+		this.valuesToEmit = new ArrayDeque<>();
+		if (context.isRestored()) {
+			// upon restoring
+
+			for (Long v : this.checkpointedState.get()) {
+				this.valuesToEmit.add(v);
+			}
+		} else {
+			// the first time the job is executed
+			final int stepSize = runtimeContext.getNumberOfParallelSubtasks();
+			final int taskIdx = runtimeContext.getIndexOfThisSubtask();
+			final long congruence = start + taskIdx;
+
+			long totalNoOfElements = Math.abs(end - start + 1);
+			final int baseSize = safeDivide(totalNoOfElements, stepSize);
+			final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
+
+			for (long collected = 0; collected < toCollect; collected++) {

Review comment:
       Will there be  a performance problem to collect all elements in advance? 
   For example, the parallelism of source is one and the totalNoOfElements is a big number




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-628983591


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjuwangg commented on a change in pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
zjuwangg commented on a change in pull request #12074:
URL: https://github.com/apache/flink/pull/12074#discussion_r424836519



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSource.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source.datagen;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+/**
+ * A data generator source that abstract data generator. It can used to easy startup/test

Review comment:
       ```suggestion
    * A data generator source that abstract data generator. It can be used to easy startup/test
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9a3c8f29181ce55ff570c9d8709950ac58d635bf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960) 
   * 5c0324117a8a88c7a821eac0d0216a675b14962f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977) 
   * bdee2834ef68c942778b3e435ee00a10da769061 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #12074:
URL: https://github.com/apache/flink/pull/12074


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjuwangg commented on a change in pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
zjuwangg commented on a change in pull request #12074:
URL: https://github.com/apache/flink/pull/12074#discussion_r424841967



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/datagen/DataGenTableSourceFactory.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources.datagen;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions.OptionBuilder;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
+import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableSchemaUtils;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Factory for creating configured instances of {@link DataGenTableSource} in a stream environment.
+ */
+@Experimental
+public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
+
+	public static final String IDENTIFIER = "datagen";
+
+	public static final ConfigOption<Long> ROWS_PER_SECOND =
+			key("rows-per-second")

Review comment:
       Seems strange indent?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626543188


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=960",
       "triggerID" : "9a3c8f29181ce55ff570c9d8709950ac58d635bf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=977",
       "triggerID" : "5c0324117a8a88c7a821eac0d0216a675b14962f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=988",
       "triggerID" : "bdee2834ef68c942778b3e435ee00a10da769061",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1035",
       "triggerID" : "0fb435bee16969f02c3ebee46beee75c2b8c074a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248",
       "triggerID" : "62f441198c518458511ce8b6b013e878c3e9f156",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 62f441198c518458511ce8b6b013e878c3e9f156 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=1248) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12074: [FLINK-17606][table] Introduce DataGenerator connector in table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12074:
URL: https://github.com/apache/flink/pull/12074#issuecomment-626521774


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 9a3c8f29181ce55ff570c9d8709950ac58d635bf (Mon May 11 07:25:52 UTC 2020)
   
   **Warnings:**
    * **1 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org