You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by akunft <gi...@git.apache.org> on 2015/08/27 08:14:10 UTC

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

GitHub user akunft opened a pull request:

    https://github.com/apache/flink/pull/1066

    [FLINK-2373] Add configuration parameter to createRemoteEnvironment method

    Adds an overloaded ExecutionEnvironment.createRemoteEnvironment method to provide a custom configuration for the job client. This resolves problems with the default configuration values, e.g. large jar files causing errors when they exceeded the default payload size of Akka.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/akunft/flink remoteConf

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1066
    
----
commit eeabe40e20386b7644c748301d7f9b338a7a2adc
Author: Andreas Kunft <an...@tu-berlin.de>
Date:   2015-07-28T23:58:32Z

    [FLINK-2373] Add configuration parameter to createRemoteEnvironment method

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38629325
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    Sorry, you're completely right. My bad.
    
    On Wed, Sep 2, 2015 at 6:54 PM, Andreas Kunft <no...@github.com>
    wrote:
    
    > In
    > flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
    > <https://github.com/apache/flink/pull/1066#discussion_r38556479>:
    >
    > > +    @Test(expected=IllegalArgumentException.class)
    > > +    public void testInvalidAkkaConfiguration() throws Throwable {
    > > +        Configuration config = new Configuration();
    > > +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    > > +
    > > +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    > > +                cluster.hostname(),
    > > +                cluster.getLeaderRPCPort(),
    > > +                config
    > > +        );
    > > +        env.getConfig().disableSysoutLogging();
    > > +
    > > +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    > > +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    > > +        try {
    > > +            env.execute();
    >
    > It would still fail, as no exception is thrown. But I can generate a nicer
    > error message. Will fix that.
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/flink/pull/1066/files#r38556479>.
    >



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1066


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-137165900
  
    Yes, I didn't enforce that the config parameter is not used, but the setParallelism overrides it.
    I let it like this, as it is more confusing to the user if all this is explained in the java docs and maybe someone wants to set it in the config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r39253434
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +	private static final int TM_SLOTS = 4;
    +
    +	private static final int NUM_TM = 1;
    +
    +	private static final int USER_DOP = 2;
    +
    +	private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +	private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +	private static ForkableFlinkMiniCluster cluster;
    +
    +	@BeforeClass
    +	public static void setupCluster() {
    +		try {
    +			Configuration config = new Configuration();
    +			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +			cluster = new ForkableFlinkMiniCluster(config, false);
    +			cluster.start();
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			fail("Error starting test cluster: " + e.getMessage());
    +		}
    +	}
    +
    +	@AfterClass
    +	public static void tearDownCluster() {
    +		try {
    +			cluster.stop();
    +		}
    +		catch (Throwable t) {
    +			t.printStackTrace();
    +			fail("Cluster shutdown caused an exception: " + t.getMessage());
    +		}
    +	}
    +
    +	/**
    +	 * Ensure that that Akka configuration parameters can be set.
    +	 */
    +	@Test(expected=IllegalArgumentException.class)
    +	public void testInvalidAkkaConfiguration() throws Throwable {
    +		Configuration config = new Configuration();
    +		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +				cluster.hostname(),
    +				cluster.getLeaderRPCPort(),
    +				config
    +		);
    +		env.getConfig().disableSysoutLogging();
    +
    +		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +		result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +		try {
    +			env.execute();
    +			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
    +		} catch (ProgramInvocationException ex) {
    +			throw ex.getCause();
    +		}
    +	}
    +
    +	/**
    +	 * Ensure that the program parallelism can be set even if the configuration is supplied.
    +	 */
    +	@Test
    +	public void testUserSpecificParallelism() throws Exception {
    +		Configuration config = new Configuration();
    +		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
    +
    +		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +				cluster.hostname(),
    +				cluster.getLeaderRPCPort(),
    +				config
    +		);
    +		env.setParallelism(USER_DOP);
    +		env.getConfig().disableSysoutLogging();
    +
    +		DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
    +				.rebalance()
    +				.mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
    +					@Override
    +					public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
    +						out.collect(getRuntimeContext().getIndexOfThisSubtask());
    +					}
    +				});
    +		List<Integer> resultCollection = new ArrayList<Integer>();
    +		result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
    --- End diff --
    
    Here you can also use `result.collect()` to obtain a list of integers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38074069
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -1105,6 +1105,27 @@ public static ExecutionEnvironment createRemoteEnvironment(String host, int port
     	}
     
     	/**
    +	 * Creates a {@link RemoteEnvironment}. The remote environment sends (parts of) the program
    +	 * to a cluster for execution. Note that all file paths used in the program must be accessible from the
    +	 * cluster. The custom configuration file is used to configure the default parallelism and Akka specific
    +	 * configuration parameters for the JobClient only. Cluster configuration has to be done in the remotely
    +	 * running Flink instance.
    --- End diff --
    
    Very good :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-136829682
  
    Added tests + behavior described above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38556752
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    +        } catch (ProgramInvocationException ex) {
    +            throw ex.getCause();
    +        }
    +    }
    +
    +    /**
    +     * Ensure that the program parallelism can be set even if the configuration is supplied.
    +     */
    +    @Test
    +    public void testUserSpecificParallelism() throws Exception {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.setParallelism(USER_DOP);
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
    +                .rebalance()
    +                .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
    +                    @Override
    +                    public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
    +                        out.collect(getRuntimeContext().getIndexOfThisSubtask());
    +                    }
    +                });
    +        List<Integer> resultCollection = new ArrayList<Integer>();
    +        result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
    +        env.execute();
    +        assertEquals(USER_DOP, resultCollection.size());
    +    }
    +
    +    private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
    +
    +        private transient boolean emitted;
    +
    +        @Override
    +        public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
    +            assertEquals(USER_DOP, numSplits);
    +            return super.createInputSplits(numSplits);
    +        }
    +
    +        @Override
    +        public boolean reachedEnd() {
    +            return emitted;
    +        }
    +
    +        @Override
    +        public Integer nextRecord(Integer reuse) {
    +            if (emitted) {
    +                return null;
    +            }
    +            emitted = true;
    +            return 1;
    +        }
    +    }
    +}
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-135830165
  
    After thinking about it, I would change the behavior so that the config is only for defining akka specific settings. The default parallelism of the operators can be set via ```setParallelism```, just like in the local environment. Otherwise, we had to use ```parallelism.default``` to set the operators parallelism, which is confusing imo. This also goes in line with the configuration description on the website.
    
    What do you think? 
    
    In any case, I will create unit tests and update the PR once we decided on a version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-135448224
  
    @uce, I think this should be addressed either in #1016 or in a separate PR once #1016 is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-139622654
  
    * rebased
    * replaced collection output with ```collect()``` in RemoteEnvironmentITCase and ExecutionEnvironmentITCase
    * added ```createRemoteEnvironment``` method with config parameter in Scala ExecutionEnvironment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38554964
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    Yes, I just ensure its cause of the wrong akka setting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-139494208
  
    LGTM modulo one minor comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-136969071
  
    From your last comment I conclude that you only wanted to allow akka specific features to be set with the configuration. However, I couldn't find how you check this. Furthermore, if I remove the `env.setParallelism` call in the `testUserSpecificParallelism` test case, then I can still control the default DOP with `config.setInteger(ConfigConstants.DEFAULT_PARALLELISM, 42)`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by akunft <gi...@git.apache.org>.
Github user akunft commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38556479
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    It would still fail, as no exception is thrown. But I can generate a nicer error message. Will fix that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-139279212
  
    LGTM, @tillrohrmann do you want to have another look as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-135372450
  
    Thanks for the PR. I agree with Till.
    
    @tillrohrmann, regarding #1016: what would be the way to use the RemoteEnvironment with an HA cluster? This will already work with HA, but the hostname and port would be ignored, right? Should Andreas already add a variant with config and jar files only? Or is that something we should address with your PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-135348048
  
    Very good contribution. Thanks @akunft. The only thing would be to add a test which verifies that the configuration is actually used for starting the `JobClientActor's` `ActorSystem`, for example. We could set the framesize to something so small that the job execution must fail. Or we change the default parallelism as it is done in `ExecutionEnvironmnetITCase.testLocalEnvironmentWithConfig`.
    
    After adding the test, +1 for merging :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38506169
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    Shouldn't we fail after this statement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38555428
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    --- End diff --
    
    I meant shouldn't we insert an `Assert.fail()` after this statement. If we successfully execute `execute` then something wrong happened, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1066#issuecomment-140212838
  
    Thanks for the update!
    Will merge this PR tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2373] Add configuration parameter to cr...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r38506469
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +    private static final int TM_SLOTS = 4;
    +
    +    private static final int NUM_TM = 1;
    +
    +    private static final int USER_DOP = 2;
    +
    +    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +    private static ForkableFlinkMiniCluster cluster;
    +
    +    @BeforeClass
    +    public static void setupCluster() {
    +        try {
    +            Configuration config = new Configuration();
    +            config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +            config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +            cluster = new ForkableFlinkMiniCluster(config, false);
    +            cluster.start();
    +        }
    +        catch (Exception e) {
    +            e.printStackTrace();
    +            fail("Error starting test cluster: " + e.getMessage());
    +        }
    +    }
    +
    +    @AfterClass
    +    public static void tearDownCluster() {
    +        try {
    +            cluster.stop();
    +        }
    +        catch (Throwable t) {
    +            t.printStackTrace();
    +            fail("Cluster shutdown caused an exception: " + t.getMessage());
    +        }
    +    }
    +
    +    /**
    +     * Ensure that that Akka configuration parameters can be set.
    +     */
    +    @Test(expected=IllegalArgumentException.class)
    +    public void testInvalidAkkaConfiguration() throws Throwable {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<String> result = env.createInput(new TestNonRichInputFormat());
    +        result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
    +        try {
    +            env.execute();
    +        } catch (ProgramInvocationException ex) {
    +            throw ex.getCause();
    +        }
    +    }
    +
    +    /**
    +     * Ensure that the program parallelism can be set even if the configuration is supplied.
    +     */
    +    @Test
    +    public void testUserSpecificParallelism() throws Exception {
    +        Configuration config = new Configuration();
    +        config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT);
    +
    +        final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
    +                cluster.hostname(),
    +                cluster.getLeaderRPCPort(),
    +                config
    +        );
    +        env.setParallelism(USER_DOP);
    +        env.getConfig().disableSysoutLogging();
    +
    +        DataSet<Integer> result = env.createInput(new ParallelismDependentInputFormat())
    +                .rebalance()
    +                .mapPartition(new RichMapPartitionFunction<Integer, Integer>() {
    +                    @Override
    +                    public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
    +                        out.collect(getRuntimeContext().getIndexOfThisSubtask());
    +                    }
    +                });
    +        List<Integer> resultCollection = new ArrayList<Integer>();
    +        result.output(new LocalCollectionOutputFormat<Integer>(resultCollection));
    +        env.execute();
    +        assertEquals(USER_DOP, resultCollection.size());
    +    }
    +
    +    private static class ParallelismDependentInputFormat extends GenericInputFormat<Integer> {
    +
    +        private transient boolean emitted;
    +
    +        @Override
    +        public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
    +            assertEquals(USER_DOP, numSplits);
    +            return super.createInputSplits(numSplits);
    +        }
    +
    +        @Override
    +        public boolean reachedEnd() {
    +            return emitted;
    +        }
    +
    +        @Override
    +        public Integer nextRecord(Integer reuse) {
    +            if (emitted) {
    +                return null;
    +            }
    +            emitted = true;
    +            return 1;
    +        }
    +    }
    +}
    --- End diff --
    
    This file is indented with whitespaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---