You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by szape <gi...@git.apache.org> on 2015/06/20 13:05:49 UTC

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

GitHub user szape opened a pull request:

    https://github.com/apache/flink/pull/853

    [FLINK-2243] [storm-compat] Added finite spout functionality to Storm compatibility layer

    Added finite spout functionality to Storm compatibility layer
    
    Storm spouts emit infinite streams. It is important to support the users of Flink's Storm compatibility layer by providing an interface that can add a finite nature to any storm spouts, given that Flink actually supports finite source streams.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mbalassi/flink finite-storm-spout

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #853
    
----
commit ce3c1cb6d80c51dcb2e27409448ce84eeec96f12
Author: szape <ne...@gmail.com>
Date:   2015-06-19T08:18:09Z

    [storm-compat] Removed failing simple join example

commit d0a80b382c5de332f91e231cd9530efd357cd051
Author: szape <ne...@gmail.com>
Date:   2015-06-19T08:22:04Z

    [storm-compat] Added finite spout functionality to Storm compatibility layer

commit d1a325e9f287300c5eca2bdfb90027aa5f27b6a6
Author: szape <ne...@gmail.com>
Date:   2015-06-19T08:43:10Z

    [storm-compat] Demonstrating finite Storm spout functionality on exclamation example
    -minor renaming
    -improving JavaDocs

commit 4a59aa6704d3bd4b9a6e334ed93160a3e4eea68f
Author: szape <ne...@gmail.com>
Date:   2015-06-19T08:44:32Z

    [storm-compat] Fixing JavaDocs in word count example

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by szape <gi...@git.apache.org>.
Github user szape commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32925004
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java ---
    @@ -1,90 +0,0 @@
    -/*
    --- End diff --
    
    Okay. I just didn't want a half-ready example to linger around. I will
    remove that commit.
    
    2015-06-22 11:58 GMT+02:00 Matthias J. Sax <no...@github.com>:
    
    > In
    > flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java
    > <https://github.com/apache/flink/pull/853#discussion_r32919318>:
    >
    > > @@ -1,90 +0,0 @@
    > > -/*
    >
    > Why do you remove it? It's going to be fixed soon.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/853/files#r32919318>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by szape <gi...@git.apache.org>.
Github user szape commented on the pull request:

    https://github.com/apache/flink/pull/853#issuecomment-120652832
  
    No, it is finished and waiting for acceptance. As I remember, Márton wanted
    to review the code, before merging it into the master.
    
    Peter
    
    2015-07-11 16:27 GMT+02:00 Matthias J. Sax <no...@github.com>:
    
    > Any news on this PR? Do you still work on it?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/853#issuecomment-120625825>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/853#issuecomment-123705638
  
    Can you also extend documentation for this feature? (I followed a "only-1-sentence-in-1-line" approach; please do the same). I would suggest to open a new section "Flink extensions" before "Storm Compatibility Examples".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32888046
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java ---
    @@ -145,7 +145,7 @@ private static boolean parseParameters(final String[] args) {
     		}
     
     		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
    -				TypeExtractor.getForClass(String.class));
    +				TypeExtractor.getForClass(String.class)).setParallelism(1);
    --- End diff --
    
    Why do you fix the parallelism here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/853#issuecomment-120625825
  
    Any news on this PR? Do you still work on it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32888034
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.stormcompatibility.util;
    +
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Values;
    +import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
    +
    +import java.io.BufferedReader;
    +import java.io.FileNotFoundException;
    +import java.io.FileReader;
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
    + * when it reached the end of the file.
    + */
    +public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteStormSpout {
    +	private static final long serialVersionUID = -6996907090003590436L;
    +
    +	private final String path;
    +	private BufferedReader reader;
    +	private String line;
    +	private boolean newLineRead;
    +
    +	public FiniteStormFileSpout(final String path) {
    +		this.path = path;
    +	}
    +
    +	@SuppressWarnings("rawtypes")
    +	@Override
    +	public void open(final Map conf, final TopologyContext context,
    +			final SpoutOutputCollector collector) {
    +		super.open(conf, context, collector);
    +		try {
    +			this.reader = new BufferedReader(new FileReader(this.path));
    +		} catch (final FileNotFoundException e) {
    +			throw new RuntimeException(e);
    +		}
    +		newLineRead = false;
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (this.reader != null) {
    +			try {
    +				this.reader.close();
    +			} catch (final IOException e) {
    +				throw new RuntimeException(e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void nextTuple() {
    +		if (!reachedEnd()) {
    +			this.collector.emit(new Values(line));
    +		}
    +		newLineRead = false;
    +	}
    +
    +	/**
    +	 * Can be called before nextTuple() any times including 0.
    +	 */
    +	public boolean reachedEnd() {
    +		try {
    +			readLine();
    +		} catch (IOException e) {
    +			e.printStackTrace();
    --- End diff --
    
    Is this exception used to signal the end of the stream? or is it a error condition?
    In any case, doing a `e.printStackStrace()` is not good because it will log it to the stdout file.
    I would rethrow a runtime exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/853


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32919318
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java ---
    @@ -1,90 +0,0 @@
    -/*
    --- End diff --
    
    Why do you remove it? It's going to be fixed soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32925744
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/singlejoin/SingleJoinTopology.java ---
    @@ -1,90 +0,0 @@
    -/*
    --- End diff --
    
    If you prepare an PR, limit the changes to the feature you are implementing. This example is not related to the new functionality, thus, you should just leave it as it is ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/853#issuecomment-128692750
  
    Nice :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by szape <gi...@git.apache.org>.
Github user szape commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32913023
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java ---
    @@ -0,0 +1,100 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.stormcompatibility.util;
    +
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Values;
    +import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
    +
    +import java.io.BufferedReader;
    +import java.io.FileNotFoundException;
    +import java.io.FileReader;
    +import java.io.IOException;
    +import java.util.Map;
    +
    +/**
    + * Implements a Storm Spout that reads data from a given local file. The spout stops automatically
    + * when it reached the end of the file.
    + */
    +public class FiniteStormFileSpout extends AbstractStormSpout implements FiniteStormSpout {
    +	private static final long serialVersionUID = -6996907090003590436L;
    +
    +	private final String path;
    +	private BufferedReader reader;
    +	private String line;
    +	private boolean newLineRead;
    +
    +	public FiniteStormFileSpout(final String path) {
    +		this.path = path;
    +	}
    +
    +	@SuppressWarnings("rawtypes")
    +	@Override
    +	public void open(final Map conf, final TopologyContext context,
    +			final SpoutOutputCollector collector) {
    +		super.open(conf, context, collector);
    +		try {
    +			this.reader = new BufferedReader(new FileReader(this.path));
    +		} catch (final FileNotFoundException e) {
    +			throw new RuntimeException(e);
    +		}
    +		newLineRead = false;
    +	}
    +
    +	@Override
    +	public void close() {
    +		if (this.reader != null) {
    +			try {
    +				this.reader.close();
    +			} catch (final IOException e) {
    +				throw new RuntimeException(e);
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void nextTuple() {
    +		if (!reachedEnd()) {
    +			this.collector.emit(new Values(line));
    +		}
    +		newLineRead = false;
    +	}
    +
    +	/**
    +	 * Can be called before nextTuple() any times including 0.
    +	 */
    +	public boolean reachedEnd() {
    +		try {
    +			readLine();
    +		} catch (IOException e) {
    +			e.printStackTrace();
    --- End diff --
    
    Thanks for pointing it out! I fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32914310
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java ---
    @@ -145,7 +145,7 @@ private static boolean parseParameters(final String[] args) {
     		}
     
     		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
    -				TypeExtractor.getForClass(String.class));
    +				TypeExtractor.getForClass(String.class)).setParallelism(1);
    --- End diff --
    
    It is correct that the dop must be set to one. The used Spout is not parallelizable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by szape <gi...@git.apache.org>.
Github user szape commented on the pull request:

    https://github.com/apache/flink/pull/853#issuecomment-125108629
  
    Of couse, I will do it.
    
    2015-07-22 14:31 GMT+02:00 Matthias J. Sax <no...@github.com>:
    
    > Can you also extend documentation for this feature? (I followed a
    > "only-1-sentence-in-1-line" approach; please do the same). I would suggest
    > to open a new section "Flink extensions" before "Storm Compatibility
    > Examples".
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/853#issuecomment-123705638>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the pull request:

    https://github.com/apache/flink/pull/853#issuecomment-127944348
  
    If no objections I will merge this in the evening. (Maybe also squashing the commits a bit.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2243] [storm-compat] Added finite spout...

Posted by szape <gi...@git.apache.org>.
Github user szape commented on a diff in the pull request:

    https://github.com/apache/flink/pull/853#discussion_r32913535
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java ---
    @@ -145,7 +145,7 @@ private static boolean parseParameters(final String[] args) {
     		}
     
     		return env.addSource(new StormFiniteSpoutWrapper<String>(new StormInMemorySpout(WordCountData.WORDS), true),
    -				TypeExtractor.getForClass(String.class));
    +				TypeExtractor.getForClass(String.class)).setParallelism(1);
    --- End diff --
    
    I thought it was a mistake, because without setting the parallelism to 1, the input will be duplicated. It does not affect the ITCases directly, though, because they run in 'file input/output' mode, but it still seems out of order.
    Any reason against setting the parallelism?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---