You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by skunert <gi...@git.apache.org> on 2014/12/04 13:35:10 UTC

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

GitHub user skunert opened a pull request:

    https://github.com/apache/incubator-flink/pull/251

    Mesos integration of Apache Flink

    Hi,
    
    this PR is a mesos client for Flink. It still needs to be tested which i will do as soon as I have access to a cluster. On a single machine with mesos running it works fine. Even though it is not ready to merge, I want to gather feedback and suggestions on it. 
    
    I tried to make everything as similar to the yarn client as possible. For an idea on how this works there is a mesos_setup.md. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/skunert/incubator-flink mesos2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/251.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #251
    
----
commit bf6bedc0c831175aca1bfb6710ab9af1dec18cdb
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-10-07T08:33:28Z

    initial mesos playaround commit

commit 6f40326d2408f2928d210c6c0a434068ecb63d8b
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-10-14T14:26:39Z

    initial mesos playaround commit

commit ccd9938a68d74299eb56d148f706d20dc840b873
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-10-26T16:19:40Z

    trying to start jobmanager

commit abd6decd70b836ac1b66762147031ef657a8c44c
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-10-29T14:51:26Z

    taskmanager and jobmanager starting

commit 2be207f0d080ebaefd9a6b3846682f694f1d6563
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-03T13:42:45Z

    moved mesos to a flink-addons module and created own profile

commit 091f21c45435448661f52318cc72defba48f4353
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-03T14:04:36Z

    fixed wrong string path

commit 03ff92609041b07d8f180d48867035ab10b9d666
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-05T16:25:51Z

    created mesos module and start scripts

commit e0a733382131094369924195e656a9e082e72bb2
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-11-04T07:50:43Z

    added dynamic flink conf dir

commit 781468d3f4205d68141d06d377773e643150158e
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-11-06T00:00:33Z

    exported flink into its own executor

commit 6c9425fcc835fe1e9a0e028f0ece988ed8708236
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-06T18:21:17Z

    introduced sepparate executors for the task and jobmanager, some code cleanup. Further cleanup and proper management of the managers required.

commit 9804cd4ae10aa16fd40cfdd952a630e66c662249
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-11-07T17:07:54Z

    Added logging, advanced MesosUtils, code cleanup.

commit bfe9491b138ff4834431917fe264e05896febc21
Author: sebastian kunert <sk...@gmail.com>
Date:   2014-11-08T16:59:33Z

    added command line interface

commit 8b87f1c9876cbc56d5fc6c561b931eaa59b59178
Author: Sebastian Kunert <se...@wlan-141-23-108-41.tubit.tu-berlin.de>
Date:   2014-11-12T16:46:31Z

    added more command line option. It is now possible to limit memory and cpu usage. Also comments have been added to the code.

commit 626ab44c4408d69b9bcf2f7b3a1e1f5e46972094
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-13T16:58:38Z

    added documentation page (currently only clone of yarn documentation page) and adjusted the slots of taskmanagers according to the available CPUs

commit aeac39482c20fc6cce6c1f83bf28b0b1fc30e46e
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-14T11:03:01Z

    added better startup script

commit b1e508df2aead4227b06865dd82daa25e6ceeaa7
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-19T15:00:27Z

    implemented handling of task statuses, introduced protobuf serialization of config parameters, added help option, code cleanup, comments

commit 7b03bb55795587b2691b04944b065dfa393f8222
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-21T16:30:24Z

    implemented serialization of configuration with protobuf

commit 0dbd40f8e64979f3156fe3795de17fa8b629fea9
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-26T16:16:57Z

    better package structure, web frontend integration, dynamic property handling, JobManager Port offset, code cleanup

commit c82a5b5872a62882660090237c26fce329a029c2
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-11-27T14:29:51Z

    unified jobmanager executor and web frontend. better cpu handling

commit 20d4939c45970909bd87d83bf11fb4a72aecfe1f
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-12-03T12:35:11Z

    fixed some resource specifications in the scheduler

commit 139fd19ae38f95d7f417b8f912b555a610b374ba
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-12-03T12:47:11Z

    added better comments

commit 068b5809b4717f744bcc8a831f3f0e60c559ef85
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-12-03T13:12:25Z

    better documentation

commit 154910a2e0c12e5f0d73e512b0de3fd267191810
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-12-03T14:41:34Z

    Merge branch 'master' of github.com:apache/incubator-flink into mesos_module

commit 21776c5d5c53543fea0fa8a215ff5b46b5961874
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-12-03T14:46:26Z

    removed unchanged files

commit 6b595b2aae8704077f42052539ed20eb8de261e1
Author: Sebastian Kunert <sk...@gmail.com>
Date:   2014-12-04T12:26:40Z

    improved webinterface output and logging parameters in the start-mesos.sh script

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-66448210
  
    Cool. 
    I'll try it out again once you've addressed the bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/251#discussion_r21365524
  
    --- Diff: flink-addons/flink-mesos/src/main/java/org/apache/flink/mesos/Executors/FlinkExecutor.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executors;
    +
    +import com.google.protobuf.InvalidProtocolBufferException;
    +import org.apache.flink.mesos.utility.FlinkProtos;
    +import org.apache.flink.mesos.utility.MesosConfiguration;
    +import org.apache.flink.mesos.utility.MesosUtils;
    +import org.apache.mesos.Executor;
    +import org.apache.mesos.ExecutorDriver;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Base class for any Flink executors. It contains basic functionality such as retrieving the configuration
    + * from the ExecutorInfo and printing Log Information to the user.
    + */
    +public abstract class FlinkExecutor implements Executor {
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutor.class);
    +	private MesosConfiguration config = new MesosConfiguration();
    +	private String executorName;
    +
    +	@Override
    +	public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
    +		this.executorName = executorInfo.getName();
    +
    +		LOG.info(executorInfo.getName() + " was registered on host " + slaveInfo.getHostname());
    +
    +		/*
    +		Retrieve the config that was serialized as Google ProtoBuf and passed to the ExecutorInfo
    +		 */
    +		FlinkProtos.Configuration protoConfig  = null;
    +
    +		try {
    +			protoConfig = FlinkProtos.Configuration.parseFrom(executorInfo.getData());
    +		} catch (InvalidProtocolBufferException e) {
    +			e.printStackTrace();
    +		}
    +		config.fromProtos(protoConfig);
    +		LOG.info(config.toString());
    --- End diff --
    
    this is probably a debug log message? or add a message to it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/251#discussion_r21365545
  
    --- Diff: flink-addons/flink-mesos/src/main/java/org/apache/flink/mesos/Executors/FlinkExecutor.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executors;
    +
    +import com.google.protobuf.InvalidProtocolBufferException;
    +import org.apache.flink.mesos.utility.FlinkProtos;
    +import org.apache.flink.mesos.utility.MesosConfiguration;
    +import org.apache.flink.mesos.utility.MesosUtils;
    +import org.apache.mesos.Executor;
    +import org.apache.mesos.ExecutorDriver;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Base class for any Flink executors. It contains basic functionality such as retrieving the configuration
    + * from the ExecutorInfo and printing Log Information to the user.
    + */
    +public abstract class FlinkExecutor implements Executor {
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutor.class);
    +	private MesosConfiguration config = new MesosConfiguration();
    +	private String executorName;
    +
    +	@Override
    +	public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
    +		this.executorName = executorInfo.getName();
    +
    +		LOG.info(executorInfo.getName() + " was registered on host " + slaveInfo.getHostname());
    +
    +		/*
    +		Retrieve the config that was serialized as Google ProtoBuf and passed to the ExecutorInfo
    +		 */
    +		FlinkProtos.Configuration protoConfig  = null;
    +
    +		try {
    +			protoConfig = FlinkProtos.Configuration.parseFrom(executorInfo.getData());
    +		} catch (InvalidProtocolBufferException e) {
    +			e.printStackTrace();
    +		}
    +		config.fromProtos(protoConfig);
    +		LOG.info(config.toString());
    +	}
    +
    +	@Override
    +	public void reregistered(ExecutorDriver driver, Protos.SlaveInfo slaveInfo) {
    +		LOG.info(executorName + " reregistered with Slave " + slaveInfo.getHostname());
    +	}
    +
    +	@Override
    +	public void disconnected(ExecutorDriver driver) {
    +		LOG.info(executorName + " was disconnected from Mesos Slave.");
    +	}
    +
    +	@Override
    +	public void killTask(ExecutorDriver driver, Protos.TaskID taskId) {
    +		MesosUtils.setTaskState(driver, taskId, Protos.TaskState.TASK_KILLED);
    +	}
    +
    +	@Override
    +	public void frameworkMessage(ExecutorDriver driver, byte[] data) {
    +
    --- End diff --
    
    what is this method for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by skunert <gi...@git.apache.org>.
Github user skunert commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-65627759
  
    I think I will soon have an account on the cluster in our building here, so that should not pose a problem. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-65761971
  
    I thought that I forgot the `-n` option, but I got this when using it:
    ```
    jclouds@development-2518-a1c:~/flink-mesos-0.8-incubating-SNAPSHOT$ MESOS_NATIVE_LIBRARY="/usr/local/lib/libmesoso" ./bin/start-mesos.sh -l /usr/share/java/ -j lib/flink-dist-0.8-incubating-SNAPSHOT-mesos.jar -m 127.0.0.1:5050 -s 8 -n 2  -tm 2000 -v  -w
    08:46:19,539 INFO  org.apache.flink.mesos.core.MesosController                   - starting
    Exception in thread "main" org.apache.commons.cli.UnrecognizedOptionException: Unrecognized option: -n
    	at org.apache.commons.cli.Parser.processOption(Parser.java:363)
    	at org.apache.commons.cli.Parser.parse(Parser.java:199)
    	at org.apache.commons.cli.Parser.parse(Parser.java:85)
    	at org.apache.flink.mesos.core.MesosController.run(MesosController.java:161)
    	at org.apache.flink.mesos.core.MesosController.main(MesosController.java:267)
    ```
    
    
    Thank you again for working on the Mesos integration. Once you have tested it on a cluster yourself, I'll try it out again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-65627066
  
    Very nice.
    I'm trying to find time to try it out this week.
    
    If you need a Cluster for testing Mesos, you could create a free account on google compute cloud and run it from here: https://google.mesosphere.com/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by skunert <gi...@git.apache.org>.
Github user skunert commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-65903905
  
    Thanks for your feedback robert, I will rework your issues und use the google mesos cluster to test this week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/251#discussion_r21365607
  
    --- Diff: flink-addons/flink-mesos/src/main/java/org/apache/flink/mesos/Executors/FlinkJMExecutor.java ---
    @@ -0,0 +1,117 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executors;
    +
    +import org.apache.flink.client.web.WebInterfaceServer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.mesos.utility.MesosConfiguration;
    +import org.apache.flink.mesos.utility.MesosConstants;
    +import org.apache.flink.mesos.utility.MesosUtils;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.mesos.ExecutorDriver;
    +import org.apache.mesos.MesosExecutorDriver;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * The Executor class is responsible for executing the actual commands on the Mesos slaves.
    + */
    +public class FlinkJMExecutor extends FlinkExecutor {
    +	/**
    +	 * Thread that is started in launchTask(). It launches the JobManager.
    +	 */
    +	private class JobManagerThread extends Thread{
    +		private final ExecutorDriver executorDriver;
    +		private final Protos.TaskInfo taskInfo ;
    +		private final Logger LOG = LoggerFactory.getLogger(JobManagerThread.class);
    +		private final MesosConfiguration config;
    +
    +		public JobManagerThread(ExecutorDriver executorDriver, Protos.TaskInfo taskInfo, MesosConfiguration config) {
    +			this.executorDriver = executorDriver;
    +			this.taskInfo = taskInfo;
    +			this.config = config;
    +		}
    +
    +		@Override
    +		public void run() {
    +			LOG.info("Starting JM Thread");
    +			JobManager jobManager;
    +
    +			try	{
    +
    +				String[] args = {"-executionMode", "cluster"};
    +				String configDir = config.getString(MesosConstants.MESOS_CONF_DIR, null);
    +
    +				/*
    +				No configuration directory is passed to the Jobmanager because the
    +				initialize() function would reload the the configuration into the
    +				GlobalConfiguration. Since we want to have flexibility to overwrite some of the configuration details
    +				in the configDir file, we load the configuration manually.
    +				 */
    +				if (configDir != null) {
    +					GlobalConfiguration.loadConfiguration(configDir);
    +				}
    +				GlobalConfiguration.includeConfiguration(this.config);
    +
    +				jobManager = JobManager.initialize(args);
    +				jobManager.startInfoServer();
    +
    +				if (config.getBoolean(MesosConstants.MESOS_USE_WEB, false)) {
    +					Configuration config = GlobalConfiguration.getConfiguration();
    +
    +					// add flink base dir to config
    +					config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..");
    +
    +					// get the listening port
    +					int port = config.getInteger(ConfigConstants.WEB_FRONTEND_PORT_KEY,
    +							ConfigConstants.DEFAULT_WEBCLIENT_PORT);
    +
    +					// start the server
    +					WebInterfaceServer server = new WebInterfaceServer(config, port);
    +					LOG.info("Starting web frontend server on port " + port + '.');
    +					server.start();
    +				}
    +			} catch (Exception e) {
    +				e.printStackTrace();
    --- End diff --
    
    I think we should log the exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-65761740
  
    Okay, cool.
    
    I've build your code (the maven side of things looks good) and tried it using a Mesosphere Google Cluster.
    
    I got the following error:
    ```
    jclouds@development-2518-a1c:~/flink-mesos-0.8-incubating-SNAPSHOT$ MESOS_NATIVE_LIBRARY="/usr/local/lib/libmesoso" ./bin/start-mesos.sh -l /usr/share/java/ -j lib/flink-dist-0.8-incubating-SNAPSHOT-mesos.jar -m 127.0.0.1:5050 
    08:36:37,931 INFO  org.apache.flink.mesos.core.MesosController                   - starting
    flink.base.dir.path: /home/jclouds/flink-mesos-0.8-incubating-SNAPSHOT/bin/..
    Warning: MESOS_NATIVE_LIBRARY is deprecated, use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will not support JNI bindings via MESOS_NATIVE_LIBRARY.
    I1205 08:36:37.995919  4386 sched.cpp:137] Version: 0.21.0
    I1205 08:36:38.002944  4400 sched.cpp:234] New master detected at master@127.0.0.1:5050
    I1205 08:36:38.003168  4400 sched.cpp:242] No credentials provided. Attempting to register without authentication
    ```
    The error is that nothing is happening after this log message (I waited for 5 minutes).
    The mesos web interface is not showing anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by skunert <gi...@git.apache.org>.
Github user skunert commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/251#discussion_r21417364
  
    --- Diff: flink-addons/flink-mesos/src/main/java/org/apache/flink/mesos/Executors/FlinkExecutor.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executors;
    +
    +import com.google.protobuf.InvalidProtocolBufferException;
    +import org.apache.flink.mesos.utility.FlinkProtos;
    +import org.apache.flink.mesos.utility.MesosConfiguration;
    +import org.apache.flink.mesos.utility.MesosUtils;
    +import org.apache.mesos.Executor;
    +import org.apache.mesos.ExecutorDriver;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Base class for any Flink executors. It contains basic functionality such as retrieving the configuration
    + * from the ExecutorInfo and printing Log Information to the user.
    + */
    +public abstract class FlinkExecutor implements Executor {
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutor.class);
    +	private MesosConfiguration config = new MesosConfiguration();
    +	private String executorName;
    +
    +	@Override
    +	public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
    +		this.executorName = executorInfo.getName();
    +
    +		LOG.info(executorInfo.getName() + " was registered on host " + slaveInfo.getHostname());
    +
    +		/*
    +		Retrieve the config that was serialized as Google ProtoBuf and passed to the ExecutorInfo
    +		 */
    +		FlinkProtos.Configuration protoConfig  = null;
    +
    +		try {
    +			protoConfig = FlinkProtos.Configuration.parseFrom(executorInfo.getData());
    +		} catch (InvalidProtocolBufferException e) {
    +			e.printStackTrace();
    +		}
    +		config.fromProtos(protoConfig);
    +		LOG.info(config.toString());
    +	}
    +
    +	@Override
    +	public void reregistered(ExecutorDriver driver, Protos.SlaveInfo slaveInfo) {
    +		LOG.info(executorName + " reregistered with Slave " + slaveInfo.getHostname());
    +	}
    +
    +	@Override
    +	public void disconnected(ExecutorDriver driver) {
    +		LOG.info(executorName + " was disconnected from Mesos Slave.");
    +	}
    +
    +	@Override
    +	public void killTask(ExecutorDriver driver, Protos.TaskID taskId) {
    +		MesosUtils.setTaskState(driver, taskId, Protos.TaskState.TASK_KILLED);
    +	}
    +
    +	@Override
    +	public void frameworkMessage(ExecutorDriver driver, byte[] data) {
    +
    --- End diff --
    
    The method is defined in the Scheduler Interface and is used to receive messages from the executors. It allows internal communication between the executors and the scheduler. But our project does not use it at the moment and the executors don't send any message, so it is never called. Also the documentation reveals that the messages are not very reliable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by skunert <gi...@git.apache.org>.
Github user skunert commented on the pull request:

    https://github.com/apache/incubator-flink/pull/251#issuecomment-66447805
  
    @rmetzger I now am able to test on a cluster and found some bugs which I am working on right now. Nevertheless, your problem described above was something else. The command you used was 
    ```MESOS_NATIVE_LIBRARY="/usr/local/lib/libmesoso" ./bin/start-mesos.sh -l /usr/share/java/ -j lib/flink-dist-0.8-incubating-SNAPSHOT-mesos.jar -m 127.0.0.1:5050 ```
    But it should be something like this:
    ```./bin/start-mesos.sh -j lib/flink-dist-0.8-incubating-SNAPSHOT-mesos.jar -m 10.86.206.126:5050 -l /usr/local/lib ```
    The -l option needs to describe the location of the native mesos library.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/251#discussion_r21365625
  
    --- Diff: flink-addons/flink-mesos/src/main/java/org/apache/flink/mesos/Executors/FlinkJMExecutor.java ---
    @@ -0,0 +1,117 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executors;
    +
    +import org.apache.flink.client.web.WebInterfaceServer;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.mesos.utility.MesosConfiguration;
    +import org.apache.flink.mesos.utility.MesosConstants;
    +import org.apache.flink.mesos.utility.MesosUtils;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.mesos.ExecutorDriver;
    +import org.apache.mesos.MesosExecutorDriver;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * The Executor class is responsible for executing the actual commands on the Mesos slaves.
    + */
    +public class FlinkJMExecutor extends FlinkExecutor {
    +	/**
    +	 * Thread that is started in launchTask(). It launches the JobManager.
    +	 */
    +	private class JobManagerThread extends Thread{
    +		private final ExecutorDriver executorDriver;
    +		private final Protos.TaskInfo taskInfo ;
    +		private final Logger LOG = LoggerFactory.getLogger(JobManagerThread.class);
    +		private final MesosConfiguration config;
    +
    +		public JobManagerThread(ExecutorDriver executorDriver, Protos.TaskInfo taskInfo, MesosConfiguration config) {
    +			this.executorDriver = executorDriver;
    +			this.taskInfo = taskInfo;
    +			this.config = config;
    +		}
    +
    +		@Override
    +		public void run() {
    +			LOG.info("Starting JM Thread");
    +			JobManager jobManager;
    +
    +			try	{
    +
    +				String[] args = {"-executionMode", "cluster"};
    +				String configDir = config.getString(MesosConstants.MESOS_CONF_DIR, null);
    +
    +				/*
    +				No configuration directory is passed to the Jobmanager because the
    +				initialize() function would reload the the configuration into the
    +				GlobalConfiguration. Since we want to have flexibility to overwrite some of the configuration details
    +				in the configDir file, we load the configuration manually.
    +				 */
    +				if (configDir != null) {
    +					GlobalConfiguration.loadConfiguration(configDir);
    +				}
    +				GlobalConfiguration.includeConfiguration(this.config);
    +
    +				jobManager = JobManager.initialize(args);
    +				jobManager.startInfoServer();
    +
    +				if (config.getBoolean(MesosConstants.MESOS_USE_WEB, false)) {
    +					Configuration config = GlobalConfiguration.getConfiguration();
    +
    +					// add flink base dir to config
    +					config.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..");
    +
    +					// get the listening port
    +					int port = config.getInteger(ConfigConstants.WEB_FRONTEND_PORT_KEY,
    +							ConfigConstants.DEFAULT_WEBCLIENT_PORT);
    +
    +					// start the server
    +					WebInterfaceServer server = new WebInterfaceServer(config, port);
    +					LOG.info("Starting web frontend server on port " + port + '.');
    +					server.start();
    +				}
    +			} catch (Exception e) {
    +				e.printStackTrace();
    +				MesosUtils.setTaskState(executorDriver, taskInfo.getTaskId(), Protos.TaskState.TASK_FAILED);
    +				this.interrupt();
    +			}
    +		}
    +	}
    +
    +	private final Logger LOG = LoggerFactory.getLogger(FlinkJMExecutor.class);
    --- End diff --
    
    we typically define the logger in the beginning of the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Mesos integration of Apache Flink

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/251#discussion_r21365505
  
    --- Diff: flink-addons/flink-mesos/src/main/java/org/apache/flink/mesos/Executors/FlinkExecutor.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executors;
    +
    +import com.google.protobuf.InvalidProtocolBufferException;
    +import org.apache.flink.mesos.utility.FlinkProtos;
    +import org.apache.flink.mesos.utility.MesosConfiguration;
    +import org.apache.flink.mesos.utility.MesosUtils;
    +import org.apache.mesos.Executor;
    +import org.apache.mesos.ExecutorDriver;
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Base class for any Flink executors. It contains basic functionality such as retrieving the configuration
    + * from the ExecutorInfo and printing Log Information to the user.
    + */
    +public abstract class FlinkExecutor implements Executor {
    +	private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutor.class);
    +	private MesosConfiguration config = new MesosConfiguration();
    +	private String executorName;
    +
    +	@Override
    +	public void registered(ExecutorDriver driver, Protos.ExecutorInfo executorInfo, Protos.FrameworkInfo frameworkInfo, Protos.SlaveInfo slaveInfo) {
    +		this.executorName = executorInfo.getName();
    +
    +		LOG.info(executorInfo.getName() + " was registered on host " + slaveInfo.getHostname());
    +
    +		/*
    +		Retrieve the config that was serialized as Google ProtoBuf and passed to the ExecutorInfo
    +		 */
    +		FlinkProtos.Configuration protoConfig  = null;
    +
    +		try {
    +			protoConfig = FlinkProtos.Configuration.parseFrom(executorInfo.getData());
    +		} catch (InvalidProtocolBufferException e) {
    +			e.printStackTrace();
    --- End diff --
    
    Can you re-throw the exception as a RuntimeException?
    Just logging an error to standard out will make debugging really hard.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---