You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ffbin <gi...@git.apache.org> on 2015/08/24 11:12:38 UTC

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

GitHub user ffbin opened a pull request:

    https://github.com/apache/flink/pull/1046

    [FLINK-2525]Add configuration support in Storm-compatibility

    - enable config can used in Spouts.open() and Bout.prepare().
    
    Example like this:
    public static void main(final String[] args) {
    	String topologyId = "Streaming WordCount";
    	final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
    	...
    	final Config conf = new Config();
    	conf.put("wordsFile", "/home/user/");
    	conf.put("delimitSize", 1024);
    	final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
    	cluster.submitTopology(topologyId, conf, builder.createTopology());
    	Utils.sleep(10 * 1000);
    	cluster.killTopology(topologyId);
    	cluster.shutdown();	
    }
    
    public class WordReader implements IRichSpout {
    	....
    	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    		try {
    			this.context = context;
    			this.fileReader = new FileReader(conf.get("wordsFile"));
    		} catch (FileNotFoundException e) {
    			throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
    		}
    		this.collector = collector;
    	}
    }
    
    public final class StormBoltTokenizer implements IRichBolt {
    	....
    	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    		this.delimitSize = stormConf.get("delimitSize");
    		this.collector = collector;
    	}	
    }


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ffbin/flink FLINK-2525

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1046.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1046
    
----
commit c6aebc10b7a010cc9cd5fb5b6505fdbc942ab7b9
Author: ffbin <86...@qq.com>
Date:   2015-08-24T09:07:26Z

    [FLINK-2525]Add configuration support in Storm-compatibility

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38866736
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java ---
    @@ -107,4 +112,40 @@ public static TopologyContext convertToTopologyContext(final StreamingRuntimeCon
     		return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
     	}
     
    +	/**
    +	 * Get storm configuration from StreamingRuntimeContext.
    +	 *
    +	 * @param ctx
    +	 *            The RuntimeContext of operator.
    +	 * @return The storm configuration map.
    +	 * @throws Exception
    --- End diff --
    
    JavaDoc incomplete


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-140224226
  
    It is not clear (at least to me) how to do this. The API does not offer an (obvious) way to set a configuration... (or I just don't get it). `StreamExecutionEnvironment` only offers `.getConfig()` and there is no `.withParameters(...)` in Streaming API (which is "deprecated" according to the discussion on the dev list).
    
    IHMO, the best way would be the possibility to set a configuration in the environment that is distributed to all operators. Should be extend Streaming API for this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-137748448
  
    The changes suggested above help with a cleaner separation between the application (here storm compatibility) and the core code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312802
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -275,7 +275,14 @@
     	 * Path to Hadoop configuration
     	 */
     	public static final String PATH_HADOOP_CONFIG = "fs.hdfs.hadoopconf";
    -	
    +
    +	// ------------------------ Storm Configuration ------------------------
    +
    +	/**
    +	 * storm configuration
    +	 */
    +	public static final String STORM_DEFAULT_CONFIG = "storm.config";
    --- End diff --
    
    I think we should put this somewhere else... Maybe adding a new class in `stormcompatibility.util` package. @StephanEwen what is your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-140291773
  
    As per discussion on the dev list, the `ExecuionConfig` has the `GlobalJobParameters`, which are useful if one type of config is used across all operators.
    
    If each of the operators needs its own config, can you create an abstract base class for the storm functions which takes a configuration as an argumen?
    
    BTW: There is no plan to remove the `withParameters()` method in the batch API. It is just not the encouraged mechanism any more...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135403521
  
    @ffbin: Can you try if you can simply put the serialized Storm Config as a byte[] into the Flink configuration? You can the unpack it inside the storm code, when needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135085374
  
    Putting a nested "stormConf" into the configuration seems just wrong, sorry. Such a specific hack in a generic utility cannot yield maintainable code.
    
    Why is that needed in the first place? Why not have a dedicated configuration object for storm?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38844529
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java ---
    @@ -107,4 +112,40 @@ public static TopologyContext convertToTopologyContext(final StreamingRuntimeCon
     		return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
     	}
     
    +	/**
    +	 * Get storm configuration from StreamingRuntimeContext.
    +	 *
    +	 * @param ctx
    +	 *            The RuntimeContext of operator.
    +	 * @return The storm configuration map.
    +	 * @throws Exception
    +	 */
    +	public static Map getStormConfFromContext(final RuntimeContext ctx)
    +			throws Exception {
    +		Map stormConf = null;
    +		if (ctx instanceof StreamingRuntimeContext) {
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +
    +			if (jobConfiguration != null) {
    +				/* topologies mode */
    +				stormConf = (Map) InstantiationUtil.readObjectFromConfig(jobConfiguration, StormConfig.STORM_DEFAULT_CONFIG, Map.class.getClassLoader());
    --- End diff --
    
    Since the map is untyped, it might happen that users pass arbitrary objects, containing classes from the user code into the Map.
    This would lead to class not found exceptions when running the code on clusters. Can you use the classloader of `StormWrapperSetupHelper´ ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37741464
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java ---
    @@ -52,6 +75,32 @@ public void testRunExecuteCancelInfinite() throws Exception {
     	}
     
     	@Test
    +	public void testOpen() throws Exception {
    +		final IRichSpout spout = mock(IRichSpout.class);
    +		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		spoutWrapper.setRuntimeContext(runtimeContext);
    +		spoutWrapper.open(mock(Configuration.class));
    +		final SourceFunction.SourceContext ctx = mock(SourceFunction.SourceContext.class);
    +		spoutWrapper.cancel();
    +		spoutWrapper.run(ctx);
    +		verify(spout).open(any(Map.class), any(TopologyContext.class), any(SpoutOutputCollector.class));
    +	}
    +
    --- End diff --
    
    Some comment as in `StormBoltWrapperTest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136636200
  
    @mjsax @StephanEwen I have finish the code change.Can you give me some comment? Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37745758
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    +
    +		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
    --- End diff --
    
    Thanks.I will check the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37746734
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java ---
    @@ -52,6 +75,32 @@ public void testRunExecuteCancelInfinite() throws Exception {
     	}
     
     	@Test
    +	public void testOpen() throws Exception {
    +		final IRichSpout spout = mock(IRichSpout.class);
    +		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		spoutWrapper.setRuntimeContext(runtimeContext);
    +		spoutWrapper.open(mock(Configuration.class));
    +		final SourceFunction.SourceContext ctx = mock(SourceFunction.SourceContext.class);
    +		spoutWrapper.cancel();
    +		spoutWrapper.run(ctx);
    +		verify(spout).open(any(Map.class), any(TopologyContext.class), any(SpoutOutputCollector.class));
    +	}
    +
    --- End diff --
    
    Thanks.I will check the map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138865484
  
    @mjsax Thanks. I have finish the change about all comments.
    @StephanEwen @rmetzger  Can you have a look at it if it can be merged? Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135399187
  
    I think the `byte[]`converting approach is the correct way to go. Storm config keys must not be `String`, thus the specific prefix trick cannot be applied.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38866357
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java ---
    @@ -107,4 +112,40 @@ public static TopologyContext convertToTopologyContext(final StreamingRuntimeCon
     		return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
     	}
     
    +	/**
    +	 * Get storm configuration from StreamingRuntimeContext.
    +	 *
    +	 * @param ctx
    +	 *            The RuntimeContext of operator.
    +	 * @return The storm configuration map.
    +	 * @throws Exception
    +	 */
    +	public static Map getStormConfFromContext(final RuntimeContext ctx)
    +			throws Exception {
    +		Map stormConf = null;
    --- End diff --
    
    Please add a test for this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-141851757
  
    Hi @fhueske , i will update the PR to use ExecutionConfig. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38382368
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		stormConf = new HashMap();
    +
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		RuntimeContext ctx = super.getRuntimeContext();
    +		if (ctx instanceof StreamingRuntimeContext) {
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +
    --- End diff --
    
    The open() is usually called by openAllOperators(), and the Configuration config parameter is usually task Configuration, not job Configuration. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-140301378
  
    Storm only supports one global configuration that is shared over all spout/bolts. So `GlobalJobParameter` will work just fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135006223
  
    @ffbin Can you extend `ExclamationTopology` such that the number of added `! `in `ExclamationBolt` and `ExclamationWithStormSpout.ExclamationMap` is configurable and adapt the tests accordingly. Please add an additional user parameter to `ExclamationWithStormSpout` and `ExclamationWithStormBolt`. Furhtermore, please **extend** `FiniteStormFileSpout` or base class with an empty constructor and configure the file to be opened via Storm configuration Map for this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38311967
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java ---
    @@ -205,7 +211,35 @@ public void open(final Configuration parameters) throws Exception {
     					this.numberOfAttributes, flinkCollector));
     		}
     
    -		this.bolt.prepare(null, topologyContext, stormCollector);
    +		Map stormConf = new HashMap();
    --- End diff --
    
    Seems to be the same code as in `AbstractStormSpoutWrapper.open(...)`. Can you unify it, ie, add a new static method to `StormWrapperSetupHelper` that does it, and just call it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38914053
  
    --- Diff: docs/apis/storm_compatibility.md ---
    @@ -201,6 +201,26 @@ DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt f
     
     See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
     
    +## Configure for embedded Spouts/Bolts
    +Embedder spouts/bolts can be configure with user defined parameters. User defined parameters is a config `Map` and be called by `Spout.open(...)` and `Bolt.prepare()`
    +as first parameter. Configuration can be used in storm topologies mode or flink mode.
    +
    + 1.Storm topologies mode example
    --- End diff --
    
    I don't think we need example code for embedded mode. (See "multiple output streams":  If a whole topology is executed using FlinkTopologyBuilder etc., there is no special attention required – it works as in regular Storm. ). A simple sentence in the paragraph should be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312505
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java ---
    @@ -35,6 +35,10 @@
     	private String line;
     	private boolean newLineRead;
     
    +	public FiniteStormFileSpout() {
    +		super();
    --- End diff --
    
    no necessary. is called automatically. make a single line `public FiniteStormFileSpout() {}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135411086
  
    @StephanEwen Hi. Your suggestion is good.I think i can serialized the key and value together as the value of confData in Job configuration.And create stormconf prefix(like "stormcong_1") as key of confData.Then get  storm conf from job configuration and add them into task configuration.I will have a try.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37750149
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java ---
    @@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina
     
     	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
     			final SubmitOptions submitOpts) throws Exception {
    -		ClusterUtil
    -				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
    +		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
    +		Configuration jobConfiguration = jobGraph.getJobConfiguration();
    +
    +		/* storm conf type must be Map<String, Object> */
    +		Map<String, Object> stormConf = (Map<String, Object>)conf;
    --- End diff --
    
    Thanks.I will change this and maybe the Configuration should add a new map, HashMap<String, Object> confData is not enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38913193
  
    --- Diff: docs/apis/storm_compatibility.md ---
    @@ -201,6 +201,26 @@ DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt f
     
     See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
     
    +## Configure for embedded Spouts/Bolts
    +Embedder spouts/bolts can be configure with user defined parameters. User defined parameters is a config `Map` and be called by `Spout.open(...)` and `Bolt.prepare()`
    +as first parameter. Configuration can be used in storm topologies mode or flink mode.
    +
    + 1.Storm topologies mode example
    +
    +    ...
    +	Map conf = new HashMap();
    --- End diff --
    
    Can you please use the same markup code as in the other code examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312112
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java ---
    @@ -86,11 +99,40 @@ public static void main(final String[] args) throws Exception {
     	// USER FUNCTIONS
     	// *************************************************************************
     
    -	private static class ExclamationMap implements MapFunction<String, String> {
    +	private static class ExclamationMap extends AbstractRichFunction implements MapFunction<String, String> {
    +
    +		private String exclamation;
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			exclamation = new String("!!!");
    --- End diff --
    
    move this to new `else` in line 121 to make clear it is the default value in case no config is given.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-134198533
  
    @mjsax .The reason why i can not see Travis details is that(from reply mail):
    The problem is that our CDN is currently blocked in mainland China. I'm talking to our CDN provider right now for getting a custom SSL certificate and domain set up, so we should be usable from China within the next weeks hopefully.
    
    I will fix the code.I only run the test of core, and miss the test in example.It is my fault.Thanks!
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38844654
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +242,64 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Map stormConf = new HashMap();
    +		stormConf.put(new String("path"), new String("/home/user/file.txt"));
    --- End diff --
    
    Just use `"path`" here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38865859
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +102,20 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		config = new HashMap();
    +
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		Map stormConf = StormWrapperSetupHelper.getStormConfFromContext(super.getRuntimeContext());
    +		if (stormConf != null) {
    +			config.putAll(stormConf);
    +		}
    +	}
    --- End diff --
    
    I just had a closer look at the code. Because we do not use the given `Configuration`, we should move the code into `run` method and remove the overwrite of `open()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37744576
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		RuntimeContext ctx = super.getRuntimeContext();
    +		if (ctx instanceof StreamingRuntimeContext)
    +		{
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +			stormConf = new HashMap<String, Object>();
    +			stormConf.putAll(jobConfiguration.getConfData());
    --- End diff --
    
    I will add null test.In fact, jobConfiguration can not be null. Because StromBoltWrapper unit test mock StreamingRuntimeContext object, so its jobConfiguration  can be null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38860604
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    --- End diff --
    
    Now I understand. This is the unused `TaskConfiguration`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r39452503
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
    @@ -418,6 +418,17 @@ public void setBytes(String key, byte[] bytes) {
     		}
     	}
     
    +	/**
    +	 * Returns the clone of confData.
    +	 *
    +	 * @return the clone of confData
    +	 */
    +	public HashMap<String, Object> getConfDataClone() {
    +		synchronized (this.confData) {
    +			return new HashMap<String, Object>(this.confData);
    --- End diff --
    
    This will just create a shallow copy, not a deep copy. Is that good enough?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38311822
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		stormConf = new HashMap();
    +
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		RuntimeContext ctx = super.getRuntimeContext();
    +		if (ctx instanceof StreamingRuntimeContext) {
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +
    --- End diff --
    
    Why do you no use the function input parameter `open(Configuration parameters)`? Is it different?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312308
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java ---
    @@ -101,20 +147,23 @@ public String map(String value) throws Exception {
     	private static boolean fileOutput = false;
     	private static String textPath;
     	private static String outputPath;
    +	private static int exclamationNum;
     
     	private static boolean parseParameters(final String[] args) {
     
     		if (args.length > 0) {
     			// parse input arguments
     			fileOutput = true;
    -			if (args.length == 2) {
    +			if (args.length == 3) {
     				textPath = args[0];
     				outputPath = args[1];
    +				exclamationNum = Integer.parseInt(args[2]);
     			} else {
     				System.err.println("Usage: ExclamationWithStormSpout <text path> <result path>");
    --- End diff --
    
    param list


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138267349
  
    @rmetzger Thanks.I have use the classloader of `StormWrapperSetupHelper´ instead of 'map' and change the fashion of creating Strings. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-137748262
  
    Concerning the changes at the core classes:
      - The storm config key is an application specific key and not part of the system configuration, therefore it should be defined as part of the application code.
      - When returning the JobManager configuration, this should return an unmodifiable configuration, so the internal config cannot be altered.
      - Do you need access to the inner config hashmap? How about storing the storm specific properties in a key like this: `InstantiationUtil.writeObjectToConfig(stormProps, config, "STORM_CONF");`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37745701
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java ---
    @@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina
     
     	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
     			final SubmitOptions submitOpts) throws Exception {
    -		ClusterUtil
    -				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
    +		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
    +		Configuration jobConfiguration = jobGraph.getJobConfiguration();
    +
    +		/* storm conf type must be Map<String, Object> */
    +		Map<String, Object> stormConf = (Map<String, Object>)conf;
    --- End diff --
    
    It is not about `HashMap` vs `Map`. It's about the generic types. You should not use `Map<String,Object>` but raw type `Map`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-141550369
  
    Hi @ffbin,
    not sure if you followed the discussion on the mailing list, but we discussed to use the ExecutionConfig instead of the JobConfig. The reason is that ExecutionConfig is user-facing and JobConfig is used for system internal configurations. 
    
    See the discussion [here](https://mail-archives.apache.org/mod_mbox/flink-dev/201509.mbox/%3CCANC1h_tVL1NHpoYrB9LaGRUP=TsYn2cPD2ZfjOop2bhuDdPY0A@mail.gmail.com%3E).
    
    It would be nice, if you could update the PR to use ExecutionConfig. 
    Thanks a lot, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r39450937
  
    --- Diff: docs/apis/storm_compatibility.md ---
    @@ -169,6 +169,13 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
     
     See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
     
    +## Configure for embedded Spouts/Bolts
    +Embedded Spouts/Bolts can be configure with user defined parameters.
    +User defined parameters is stored in a `Map`(as in Storm).
    +And this Map is provided as a parameter in the calls `Spout.open(...)` and `Bolt.prepare(...)`.
    +Configuration can be used in storm topologies mode or flink mode.
    --- End diff --
    
    capitalize *Storm* and *Flink*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135411850
  
    Can you try to serialize the whole `Map` into a single `byte[]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136289777
  
    This is the stack trace (occurs in 4/5 runs -- the other run failed before due to unrelated test). It seems you broke something.
    ```
    Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 5.149 sec <<< FAILURE! - in org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase
    testJobWithoutObjectReuse(org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase)  Time elapsed: 3.825 sec  <<< FAILURE!
    java.lang.AssertionError: Error while calling the test program: Job execution failed.
    	at org.junit.Assert.fail(Assert.java:88)
    	at org.apache.flink.streaming.util.StreamingProgramTestBase.testJobWithoutObjectReuse(StreamingProgramTestBase.java:102)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:606)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
    	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
    	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
    	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
    	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
    	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
    ```
    I will review after you fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136302889
  
    Travis run on Linux. There is only a single ":" in the path there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37976623
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java ---
    @@ -52,6 +77,38 @@ public void testRunExecuteCancelInfinite() throws Exception {
     	}
     
     	@Test
    +	public void testOpen() throws Exception {
    +		final IRichSpout spout = mock(IRichSpout.class);
    +		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
    +
    +		Map stormConf = new HashMap();
    +		stormConf.put(new String("path"), new String("/home/user/file.txt"));
    +		stormConf.put(1, 1024);
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.putStormConf(stormConf);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		spoutWrapper.setRuntimeContext(runtimeContext);
    +		spoutWrapper.open(mock(Configuration.class));
    +		final SourceFunction.SourceContext ctx = mock(SourceFunction.SourceContext.class);
    +		spoutWrapper.cancel();
    +		spoutWrapper.run(ctx);
    +
    +		Map mapExpect = new HashMap();
    +		mapExpect.put(new String("path"), new String("/home/user/file.txt"));
    +		mapExpect.put(1, 1024);
    +		verify(spout).open(eq(mapExpect), any(TopologyContext.class), any(SpoutOutputCollector.class));
    --- End diff --
    
    Use `stormConf` instead of `mapExpect`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38913931
  
    --- Diff: docs/apis/storm_compatibility.md ---
    @@ -201,6 +201,26 @@ DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt f
     
     See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
     
    +## Configure for embedded Spouts/Bolts
    --- End diff --
    
    Move this whole section one up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37746137
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java ---
    @@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina
     
     	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
     			final SubmitOptions submitOpts) throws Exception {
    -		ClusterUtil
    -				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
    +		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
    +		Configuration jobConfiguration = jobGraph.getJobConfiguration();
    +
    +		/* storm conf type must be Map<String, Object> */
    +		Map<String, Object> stormConf = (Map<String, Object>)conf;
    --- End diff --
    
    Your comment is not rendered correctly. Read "Markdown supported". For inline source code use single tick at the beginning and end. You meant `public class Config extends HashMap<String, Object>` but `<String,Object>` is not shown above....


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-140222747
  
    Actually, I think going through the `TaskConfig` as proposed by @mjsax is the cleaner way. Going through the system-internal `JobConfiguration` and exposing it to user programs is not a good choice, in my opinion. 
    
    The purpose of `TaskConfig` is exactly to give parameters to a task (function, spout). Also the `StreamingRuntimeContext` would not need to be adapted, because it already offers a method `getTaskStubParameters()`. Would that work as well or are there major issues preventing you from using the `TaskConfig`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38867413
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java ---
    @@ -86,11 +99,43 @@ public static void main(final String[] args) throws Exception {
     	// USER FUNCTIONS
     	// *************************************************************************
     
    -	private static class ExclamationMap implements MapFunction<String, String> {
    +	private static class ExclamationMap extends AbstractRichFunction implements MapFunction<String, String> {
    +
    +		private String exclamation;
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			RuntimeContext ctx = super.getRuntimeContext();
    --- End diff --
    
    You should not use `JobConfiguration` for regular Flink functions. Instantiate `exlamation` via constructor Argument `exlcamationNum`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37745589
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		RuntimeContext ctx = super.getRuntimeContext();
    +		if (ctx instanceof StreamingRuntimeContext)
    +		{
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +			stormConf = new HashMap<String, Object>();
    +			stormConf.putAll(jobConfiguration.getConfData());
    --- End diff --
    
    If it cannot be `null` in test, does not mean it cannot be `null` in real cluster deployment...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38914538
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java ---
    @@ -70,7 +92,7 @@ public void testRawType() throws Exception {
     		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
     
     		StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
    -				Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
    +				Sets.newHashSet(new String[]{Utils.DEFAULT_STREAM_ID}));
    --- End diff --
    
    No formatting changes please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37741748
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java ---
    @@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina
     
     	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
     			final SubmitOptions submitOpts) throws Exception {
    -		ClusterUtil
    -				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
    +		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
    +		Configuration jobConfiguration = jobGraph.getJobConfiguration();
    +
    +		/* storm conf type must be Map<String, Object> */
    +		Map<String, Object> stormConf = (Map<String, Object>)conf;
    --- End diff --
    
    The configuration in Strom is an untyped `Map`. Casting it to `Map<String, Object>` might fail. Flink should not limit the Map in this way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138051794
  
    @mjsax @StephanEwen Thanks. I have finish the changes at the core classes. 
    when user use env.getConfig().setGlobalJobParameters(conf); to set storm configuration,  I need convert Configuration into map<Object,Object>, toMap() is not enough, So i add getConfDataClone() to get a clone of inner config hashmap.
    I have a look at how to use TaskConfiguration instead of JobConfiguration.
    TaskConfiguration transfer path is:
    TaskConfiguration <-- TaskDeploymentDescriptor <-- JobVertex <-- StreamConfig 
    In StreamingJobGraphGenerator.java setVertexConfig() function set TaskConfiguration, like this:
    		...
    		config.setNumberOfOutputs(nonChainableOutputs.size());
    		config.setNonChainedOutputs(nonChainableOutputs);
    		config.setChainedOutputs(chainableOutputs);
    		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
    		...
    If we want to use TaskConfiguration instead of JobConfiguration, we need add special interface to set stormConfig in StreamGraph and StreamConfig.
    I think it maybe break the separation between the storm-compatibility and core code.
    what is your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38456958
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +106,42 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		stormConf = new HashMap();
    +
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		RuntimeContext ctx = super.getRuntimeContext();
    +		if (ctx instanceof StreamingRuntimeContext) {
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +
    --- End diff --
    
    I see. Would it be better to add Storm config to the task-config to simplify code in `open(...)`  method? For `FlinkTopology` the same config would be added to each Spout/Bolt. At least for embedded mode, using task-config instead of job-config would be more appropriate IMHO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37741197
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -97,9 +103,21 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
     	}
     
     	@Override
    +	public void open(Configuration parameters) throws Exception {
    +		/* parameters is task configuration, we can get storm configuration only from job configuration */
    +		RuntimeContext ctx = super.getRuntimeContext();
    +		if (ctx instanceof StreamingRuntimeContext)
    +		{
    +			Configuration jobConfiguration = ((StreamingRuntimeContext) ctx).getJobConfiguration();
    +			stormConf = new HashMap<String, Object>();
    +			stormConf.putAll(jobConfiguration.getConfData());
    --- End diff --
    
    Missing `null` test on `jobConfiguration` (or is this test not necessary -- it is done in StromBoltWrapper)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135403187
  
    @StephanEwen Thansk.The key of storm config is object, so maybe the confData(HashMap<String, Object>) of Configuration is not enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312152
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java ---
    @@ -101,20 +143,23 @@ public String map(String value) throws Exception {
     	private static boolean fileOutput = false;
     	private static String textPath;
     	private static String outputPath;
    +	private static int exclamationNum;
     
     	private static boolean parseParameters(final String[] args) {
     
     		if (args.length > 0) {
     			// parse input arguments
     			fileOutput = true;
    -			if (args.length == 2) {
    +			if (args.length == 3) {
     				textPath = args[0];
     				outputPath = args[1];
    +				exclamationNum = Integer.parseInt(args[2]);
     			} else {
     				System.err.println("Usage: ExclamationWithStormBolt <text path> <result path>");
    --- End diff --
    
    extend parameter list


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r39453328
  
    --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java ---
    @@ -91,7 +91,16 @@ public InputSplitProvider getInputSplitProvider() {
     	public Configuration getTaskStubParameters() {
     		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
     	}
    -	
    +
    +	/**
    +	 * Returns the job configuration.
    +	 *
    +	 * @return The job configuration.
    +	 */
    +	public Configuration getJobConfiguration() {
    +		return new Configuration(env.getJobConfiguration());
    --- End diff --
    
    The `JobConfiguration` is a system-internal configuration. I am not sure that it is a good idea to give access to it in the `StreamingRuntimeContext`. This change will be visible for all DataStream programs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-134162997
  
    Travis fails because you broke something...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138320697
  
    Can you update documentation, too? For README.md just delete the line that claims configuration is not supported. WebPage documentation should contain a short paragraph how embedded Spouts/Bolts can be configures (for whole topologies it works as in Storm, ie, we don't need to cover this in the documentation).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37741352
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    +
    +		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
    --- End diff --
    
    You should not check for `any(Map.class)` but check if the map contains the value you set above in `jobConfiguration`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135000384
  
    Can anyone have a look at `Configuration.java`. Not sure if the changes are ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135412789
  
    Oh. you are right. Serialize the whole Map into a single byte[] is better.Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136304296
  
    Oh, So not all test case can run successfully in windows? 
    I have change this and commit again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38914417
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java ---
    @@ -107,4 +112,39 @@ public static TopologyContext convertToTopologyContext(final StreamingRuntimeCon
     		return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
     	}
     
    +	/**
    +	 * Get storm configuration from StreamingRuntimeContext.
    +	 * @param ctx The RuntimeContext of operator.
    +	 * @return The storm configuration map.
    +	 * @throws Exception
    +	 * 		If configuration contains classes from the user code, it may lead to ClassNotFoundException..
    --- End diff --
    
    Typo. Two dots at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136301903
  
    @mjsax Thanks.
    I have a question:
    In my windows machine, the textPath of ExclamationWithStormSpoutITCase is file:/C:/Users/xxx/AppData/Local/Temp/org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase-text.txt. After split by ":", we get path /Users/xxx/AppData/Local/Temp/org.apache.flink.stormcompatibility.exclamation.ExclamationWithStormSpoutITCase-text.txt and StormFileSpout can not open this path.But CI can run successfully before. Do you know why?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138292133
  
    @mjsax @StephanEwen @rmetzger I have finish the change about all comments. Can you have a look at it if it can be merged? Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37745377
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java ---
    @@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina
     
     	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
     			final SubmitOptions submitOpts) throws Exception {
    -		ClusterUtil
    -				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
    +		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
    +		Configuration jobConfiguration = jobGraph.getJobConfiguration();
    +
    +		/* storm conf type must be Map<String, Object> */
    +		Map<String, Object> stormConf = (Map<String, Object>)conf;
    --- End diff --
    
    The configuration in Storm is "public class Config extends HashMap<String, Object>".It extends HashMap<String, Object>,if i use untyped Map, maybe it is hard to convert it into Storm Config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r39450885
  
    --- Diff: docs/apis/storm_compatibility.md ---
    @@ -169,6 +169,13 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.
     
     See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.  
     
    +## Configure for embedded Spouts/Bolts
    +Embedded Spouts/Bolts can be configure with user defined parameters.
    +User defined parameters is stored in a `Map`(as in Storm).
    --- End diff --
    
    ... *are* stored ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37746694
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    --- End diff --
    
    The open() is usually called by openAllOperators(), and the Configuration config parameter is usually task Configuration, not job Configuration. So i mock it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312165
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java ---
    @@ -101,20 +143,23 @@ public String map(String value) throws Exception {
     	private static boolean fileOutput = false;
     	private static String textPath;
     	private static String outputPath;
    +	private static int exclamationNum;
     
     	private static boolean parseParameters(final String[] args) {
     
     		if (args.length > 0) {
     			// parse input arguments
     			fileOutput = true;
    -			if (args.length == 2) {
    +			if (args.length == 3) {
     				textPath = args[0];
     				outputPath = args[1];
    +				exclamationNum = Integer.parseInt(args[2]);
     			} else {
     				System.err.println("Usage: ExclamationWithStormBolt <text path> <result path>");
     				return false;
     			}
     		} else {
    +			exclamationNum = 3;
     			System.out.println("Executing ExclamationWithStormBolt example with built-in default data");
     			System.out.println("  Provide parameters to read input data from a file");
     			System.out.println("  Usage: ExclamationWithStormBolt <text path> <result path>");
    --- End diff --
    
    param list


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-139959540
  
    @StephanEwen @rmetzger Can you have a look at it if it can be merged? I am also work on storm task hooks and it depend on this PR. Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38867724
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/stormoperators/ExclamationBolt.java ---
    @@ -31,10 +31,22 @@
     public class ExclamationBolt implements IRichBolt {
     	OutputCollector _collector;
     
    +	private String exclamation;
    +
     	@SuppressWarnings("rawtypes")
     	@Override
     	public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
     		_collector = collector;
    +		exclamation = "!!!";
    +
    +		if (conf != null && conf.containsKey("exclamationNum")) {
    --- End diff --
    
    We can safely assume, that `conf != null`. No checking needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r39453004
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/Configuration.java ---
    @@ -418,6 +418,17 @@ public void setBytes(String key, byte[] bytes) {
     		}
     	}
     
    +	/**
    +	 * Returns the clone of confData.
    +	 *
    +	 * @return the clone of confData
    +	 */
    +	public HashMap<String, Object> getConfDataClone() {
    +		synchronized (this.confData) {
    +			return new HashMap<String, Object>(this.confData);
    --- End diff --
    
    Yes. That's fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136241503
  
    @mjsax @StephanEwen I have finish the code changes.
    1.serialize Storm Config as a byte[] into the Flink configuration
    2.extend ExclamationTopology such that the number of added !in ExclamationBolt and ExclamationWithStormSpout.ExclamationMap is configurable and adapt the tests.
    3.extend FiniteStormFileSpout and base class with an empty constructor and configure the file to be opened via Storm configuration Map.
    I have run flink-storm-compatibility test successfully in local machine and do not know why CI failed.
    Can you have a look at my code? Thank you very much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1046


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37741303
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +239,31 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.setString(new String("path"), new String("/home/user/file.txt"));
    +		jobConfiguration.setInteger(new String("delimitSize"), 1024);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    --- End diff --
    
    Why do you mock here and not use `jobConfiguration`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135406207
  
    Sorry, but I don't understand your question...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-134162565
  
    I don't see changes in `FlinkClient`. Only in `FlinkLocalCluster`. Did you test by starting Flink via `bin/start-local.sh` (it would be even better to test in a real cluster)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38867892
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java ---
    @@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
     	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
     		super.open(conf, context, collector);
     		try {
    +			/*  If config is given, it should override constructor path  */
    +			if (conf != null && conf.containsKey("textpath")) {
    +				this.path = (String)conf.get("textpath");
    +			}
    +
     			this.reader = new BufferedReader(new FileReader(this.path));
     		} catch (final FileNotFoundException e) {
     			throw new RuntimeException(e);
    +		} catch (final NullPointerException e) {
    --- End diff --
    
    Why do you catch `NullPointerException`? Can be removed. NPE is a runtime exception anyway and you just re-throw.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-137742282
  
    I have just a quick look over it, and so far it like it. Two things are open to be discussed. I not sure it the change to `ConfigConstants` in a good choice. Would it be feasible to use `TaskConfiguration` instead of `JobConfiguration`. This would make a code cleaner from my point of view.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138470052
  
    @mjsax @StephanEwen @rmetzger I have finish the change about all comments and update documentation. Can you have a look at it if it can be merged? Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37976536
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---
    @@ -222,6 +240,36 @@ public void testOpenSink() throws Exception {
     
     	@SuppressWarnings("unchecked")
     	@Test
    +	public void testOpenWithStormConf() throws Exception {
    +		final IRichBolt bolt = mock(IRichBolt.class);
    +		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
    +
    +		Map stormConf = new HashMap();
    +		stormConf.put(new String("path"), new String("/home/user/file.txt"));
    +		stormConf.put(1, 1024);
    +		Configuration jobConfiguration = new Configuration();
    +		jobConfiguration.putStormConf(stormConf);
    +		Environment env = new RuntimeEnvironment(new JobID(), new JobVertexID(), new ExecutionAttemptID(),
    +				new String(), new String(), 1, 2, jobConfiguration, mock(Configuration.class), mock(ClassLoader.class),
    +				mock(MemoryManager.class), mock(IOManager.class), mock(BroadcastVariableManager.class),
    +				mock(AccumulatorRegistry.class), mock(InputSplitProvider.class), mock(Map.class),
    +				new ResultPartitionWriter[1], new InputGate[1], mock(ActorGateway.class),
    +				mock(TaskManagerRuntimeInfo.class));
    +		StreamingRuntimeContext ctx = new StreamingRuntimeContext(env, new ExecutionConfig(),
    +				mock(KeySelector.class),
    +				mock(StateHandleProvider.class), mock(Map.class));
    +
    +		wrapper.setup(mock(Output.class), ctx);
    +		wrapper.open(mock(Configuration.class));
    +
    +		Map mapExpect = new HashMap();
    +		mapExpect.put(new String("path"), new String("/home/user/file.txt"));
    +		mapExpect.put(1, 1024);
    +		verify(bolt).prepare(eq(mapExpect), any(TopologyContext.class), any(OutputCollector.class));
    --- End diff --
    
    Remove `mapExpected` and use `stormConf` from above to avoid code duplication.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38867651
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java ---
    @@ -85,12 +103,43 @@ public static void main(final String[] args) throws Exception {
     	// USER FUNCTIONS
     	// *************************************************************************
     
    -	private static class ExclamationMap implements MapFunction<String, String> {
    +	private static class ExclamationMap extends AbstractRichFunction implements MapFunction<String, String> {
     		private static final long serialVersionUID = -684993133807698042L;
    +		private String exclamation;
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			RuntimeContext ctx = super.getRuntimeContext();
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312340
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormSpout.java ---
    @@ -85,8 +103,36 @@ public static void main(final String[] args) throws Exception {
     	// USER FUNCTIONS
     	// *************************************************************************
     
    -	private static class ExclamationMap implements MapFunction<String, String> {
    +	private static class ExclamationMap extends AbstractRichFunction implements MapFunction<String, String> {
     		private static final long serialVersionUID = -684993133807698042L;
    +		private String exclamation;
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			exclamation = new String("!!!");
    --- End diff --
    
    why no use input parameter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-134190885
  
    It fails in two test. You should actually see it, if you execute test locally. You should run test each time before you open/update an PR (at least for the module you did changes).
    
    ```
    Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.751 sec <<< FAILURE! - in org.apache.flink.stormcompatibility.split.BoltSplitITCase
    testTopology(org.apache.flink.stormcompatibility.split.BoltSplitITCase) Time elapsed: 0.635 sec <<< ERROR!
    java.lang.NullPointerException: null
    at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
    at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
    at org.apache.flink.stormcompatibility.split.StormSplitStreamBoltLocal.main(StormSplitStreamBoltLocal.java:42)
    at org.apache.flink.stormcompatibility.split.BoltSplitITCase.testTopology(BoltSplitITCase.java:25)
    ```
    and
    ```
    Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0.704 sec <<< FAILURE! - in org.apache.flink.stormcompatibility.split.SpoutSplitITCase
    testTopology(org.apache.flink.stormcompatibility.split.SpoutSplitITCase) Time elapsed: 0.584 sec <<< ERROR!
    java.lang.NullPointerException: null
    at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopologyWithOpts(FlinkLocalCluster.java:52)
    at org.apache.flink.stormcompatibility.api.FlinkLocalCluster.submitTopology(FlinkLocalCluster.java:41)
    at org.apache.flink.stormcompatibility.split.StormSplitStreamSpoutLocal.main(StormSplitStreamSpoutLocal.java:42)
    at org.apache.flink.stormcompatibility.split.SpoutSplitITCase.testTopology(SpoutSplitITCase.java:25)
    ```
    
    Just out of curiosity: why can you not see Travis details? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-136305354
  
    This might be the case. I never tried it. And as far as I know, all developers work on Linux or Mac, so this was never an issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135396475
  
    You can turn the storm config into a byte[] via the `InstantiationUtil` class. That byte[] can be stored in the regular config.
    
    We could also add a "nested config" class which only shows keys that start with a certain key prefix. This would be a config view over another config. Inside the `TaskConfig`, we use this trick with the delegating config. It may be useful here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38914633
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java ---
    @@ -88,9 +88,23 @@ public static void main(final String[] args) throws Exception {
     
     	private static class ExclamationMap implements MapFunction<String, String> {
     
    +		private String exclamation;
    +
    +		public ExclamationMap() {
    --- End diff --
    
    Use `exclamationNum` as constructor parameter and build `exclamation` accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38312609
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java ---
    @@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
     	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
     		super.open(conf, context, collector);
     		try {
    +			/* if inputFile path has not set, get it from storm configuration  */
    +			if (this.path == null && conf != null && conf.containsKey(new String("textpath"))) {
    --- End diff --
    
    I would not check for `this.path == null`. If config is given, it should override constructor path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-134184818
  
    @mjsax Thank you very much.I miss the change in FlinkClient.I will fix it and test via bin/start-local.sh.In china, now we can not see the CI details and it is hard to know why CI failed.Thank you for your reminder.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by ffbin <gi...@git.apache.org>.
Github user ffbin commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-135405543
  
    @mjsax hi. I want to make the number of added '!' in ExclamationBolt and ExclamationWithStormSpout.ExclamationMap configurabled by prepare() / open() function.The number can be get from jobConfiguration.What is your suggestion? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r37746515
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java ---
    @@ -41,8 +43,49 @@ public void submitTopology(final String topologyName, final Map<?, ?> conf, fina
     
     	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
     			final SubmitOptions submitOpts) throws Exception {
    -		ClusterUtil
    -				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
    +		JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
    +		Configuration jobConfiguration = jobGraph.getJobConfiguration();
    +
    +		/* storm conf type must be Map<String, Object> */
    +		Map<String, Object> stormConf = (Map<String, Object>)conf;
    --- End diff --
    
    The definition of `Config` in not important here. Storm uses raw map as parameter in `StormSubmitter.submitTopology(...)`, `Spout.open(...)`, and `Bolt.prepare(...)`. Thus, it would be possible to use a different type than `String` for key values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38865906
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---
    @@ -57,6 +58,10 @@
     	 * Indicates, if the source is still running or was canceled.
     	 */
     	protected volatile boolean isRunning = true;
    +	/**
    +	 * The job configuration which include storm configuration.
    +	 */
    +	protected Map config;
    --- End diff --
    
    Can be removed, if `open()` is not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/1046#issuecomment-138524518
  
    I just had a few "cosmetic" comments. Otherwise it looks good to me to get merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38867760
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java ---
    @@ -45,9 +49,16 @@ public StormFileSpout(final String path) {
     	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
     		super.open(conf, context, collector);
     		try {
    +			/*  If config is given, it should override constructor path  */
    +			if (conf != null && conf.containsKey("textpath")) {
    +				this.path = (String)conf.get("textpath");
    --- End diff --
    
    `conf != null` can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by mjsax <gi...@git.apache.org>.
Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38913061
  
    --- Diff: docs/apis/storm_compatibility.md ---
    @@ -201,6 +201,26 @@ DataStream<BoltOutputType> s2 = splitStream.select("s2").transform(/* use Bolt f
     
     See [SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/split/SpoutSplitExample.java) for a full example.
     
    +## Configure for embedded Spouts/Bolts
    +Embedder spouts/bolts can be configure with user defined parameters. User defined parameters is a config `Map` and be called by `Spout.open(...)` and `Bolt.prepare()`
    --- End diff --
    
    Typo: Embedded
    Please spell Spouts and Bolts always with a capital letter (for consistency over the whole documentation)
    Please also follow a "one sentence per line" source code style. It makes diffs easier to read in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2525]Add configuration support in Storm...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1046#discussion_r38844802
  
    --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationWithStormBolt.java ---
    @@ -78,6 +86,11 @@ public static void main(final String[] args) throws Exception {
     			exclaimed.print();
     		}
     
    +		// set bolt and map exclamation marks num
    +		Configuration conf = new Configuration();
    +		conf.setInteger(new String("exclamationNum"), exclamationNum);
    --- End diff --
    
    Why are you creating Strings like this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---