You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2015/06/08 12:51:50 UTC

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/804

    [FLINK-2182] Add stateful Streaming Sequence Source

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink stateful-seq-source

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/804.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #804
    
----
commit e37e285797b26073f3a6880f1d02f62af40a537a
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-06-08T10:50:38Z

    [FLINK-2182] Add stateful Streaming Sequence Source

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/804#issuecomment-110022371
  
    I modified the ComplexIntegrationTest to make it work with the different ordering of the new checkpointable parallel sequence source. Maybe someone should have a look at it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/804#discussion_r31911662
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -412,7 +412,7 @@ public void registerType(Class<?> type) {
     		if (from > to) {
     			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
     		}
    -		return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
    +		return addSource(new StatefulSequenceSource(from, to), "Sequence Source").setParallelism(1);
     	}
    --- End diff --
    
    But why wouldn't we allow the user to explicitly set a higher parallelism. The call is quite explicit and I think I would be more surprised as a user if my source did not run in parallel even though I set a higher parallelism. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/804


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/804#discussion_r31913451
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -412,7 +412,7 @@ public void registerType(Class<?> type) {
     		if (from > to) {
     			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
     		}
    -		return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
    +		return addSource(new StatefulSequenceSource(from, to), "Sequence Source").setParallelism(1);
     	}
    --- End diff --
    
    We discussed this quickly and came to the solution that we remove method generateParallelSequence from the StreamExecutionEnvironment since the batch API also only has generateSequence which can be used for "parallelism 1" sources and parallel sources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/804#discussion_r31912276
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -412,7 +412,7 @@ public void registerType(Class<?> type) {
     		if (from > to) {
     			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
     		}
    -		return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
    +		return addSource(new StatefulSequenceSource(from, to), "Sequence Source").setParallelism(1);
     	}
    --- End diff --
    
    The current behaviour was that if the user tried setting at least two as parallelism then the stream graph generator throw an exception on execute, telling the user that this is in fact not a parallel source.
    If we enable setting it the way you proposed than it is not much use the having a separate `generateSequence` and a `generateParallelSequence`. Personally I am not a big fan of the distinction, but if it is there let us do it right. Are the batch versions different?
    This is not a release blocker though, a JIRA for 0.10 is an OK solution on my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/804#issuecomment-110350898
  
    Looks good now, we should not really test the ordering there anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/804#discussion_r31910945
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +/**
    + * A stateful streaming source that emits each number from a given interval exactly once,
    + * possibly in parallel.
    + */
    +public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
    +	private static final long serialVersionUID = 1L;
    +
    +	private final long start;
    +	private final long end;
    +
    +	private long collected;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * Creates a source that emits all numbers from the given interval exactly once.
    +	 *
    +	 * @param start Start of the range of numbers to emit.
    +	 * @param end End of the range of numbers to emit.
    +	 */
    +	public StatefulSequenceSource(long start, long end) {
    +		this.start = start;
    +		this.end = end;
    +		this.collected = 0;
    +	}
    +
    +	@Override
    +	public void run(SourceContext<Long> ctx) throws Exception {
    +		final Object checkpointLock = ctx.getCheckpointLock();
    +
    +		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
    +
    --- End diff --
    
    We can get away with not casting this, nothing streaming specific is used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/804#discussion_r31910888
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -412,7 +412,7 @@ public void registerType(Class<?> type) {
     		if (from > to) {
     			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
     		}
    -		return fromCollection(new NumberSequenceIterator(from, to), BasicTypeInfo.LONG_TYPE_INFO, "Sequence Source");
    +		return addSource(new StatefulSequenceSource(from, to), "Sequence Source").setParallelism(1);
     	}
    --- End diff --
    
    This is a bit hacky. The problem is that this solution permits the user to manually set the parallelism to bigger later, which breaks the assumptions about non-parallel sources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/804#discussion_r31911518
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.api.functions.source;
    +
    +
    +import org.apache.flink.streaming.api.checkpoint.Checkpointed;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +
    +/**
    + * A stateful streaming source that emits each number from a given interval exactly once,
    + * possibly in parallel.
    + */
    +public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
    +	private static final long serialVersionUID = 1L;
    +
    +	private final long start;
    +	private final long end;
    +
    +	private long collected;
    +
    +	private volatile boolean isRunning = true;
    +
    +	/**
    +	 * Creates a source that emits all numbers from the given interval exactly once.
    +	 *
    +	 * @param start Start of the range of numbers to emit.
    +	 * @param end End of the range of numbers to emit.
    +	 */
    +	public StatefulSequenceSource(long start, long end) {
    +		this.start = start;
    +		this.end = end;
    +		this.collected = 0;
    +	}
    +
    +	@Override
    +	public void run(SourceContext<Long> ctx) throws Exception {
    +		final Object checkpointLock = ctx.getCheckpointLock();
    +
    +		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
    +
    --- End diff --
    
    Yes, no idea what I did there. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2182] Add stateful Streaming Sequence S...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/804#issuecomment-110257970
  
    Thanks for the heads-up. Looking. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---