You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/02/04 23:42:37 UTC
[4/5] mesos git commit: Added an example executor based on the new V1
API.
Added an example executor based on the new V1 API.
This change adds a custom executor based on the new executor library.
Review: https://reviews.apache.org/r/42185/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bb86676
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bb86676
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bb86676
Branch: refs/heads/master
Commit: 8bb8667602b78da74374e669a6ebfdb8e5d56972
Parents: e3c3cf8
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:33 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:33 2016 -0800
----------------------------------------------------------------------
src/Makefile.am | 5 +
src/examples/test_http_executor.cpp | 241 +++++++++++++++++++++++++++++++
2 files changed, 246 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 420aa40..22f5131 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1601,6 +1601,11 @@ test_executor_SOURCES = examples/test_executor.cpp
test_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
test_executor_LDADD = libmesos.la $(LDADD)
+check_PROGRAMS += test-http-executor
+test_http_executor_SOURCES = examples/test_http_executor.cpp
+test_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_http_executor_LDADD = libmesos.la $(LDADD)
+
check_PROGRAMS += long-lived-framework
long_lived_framework_SOURCES = examples/long_lived_framework.cpp
long_lived_framework_CPPFLAGS = $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/examples/test_http_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_executor.cpp b/src/examples/test_http_executor.cpp
new file mode 100644
index 0000000..4916e0e
--- /dev/null
+++ b/src/examples/test_http_executor.cpp
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+using std::cout;
+using std::endl;
+using std::queue;
+using std::string;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
+using process::spawn;
+using process::wait;
+
+
+class TestExecutor: public process::Process<TestExecutor>
+{
+public:
+ TestExecutor(const FrameworkID& _frameworkId, const ExecutorID& _executorId)
+ : frameworkId(_frameworkId),
+ executorId(_executorId),
+ mesos(mesos::ContentType::PROTOBUF,
+ process::defer(self(), &Self::connected),
+ process::defer(self(), &Self::disconnected),
+ process::defer(self(), &Self::received, lambda::_1)),
+ state(DISCONNECTED) {}
+
+ void connected()
+ {
+ state = CONNECTED;
+
+ doReliableRegistration();
+ }
+
+ void doReliableRegistration()
+ {
+ if (state == SUBSCRIBED || state == DISCONNECTED) {
+ return;
+ }
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ // Send all unacknowledged updates.
+ foreach (const Call::Update& update, updates.values()) {
+ subscribe->add_unacknowledged_updates()->MergeFrom(update);
+ }
+
+ // Send all unacknowledged tasks.
+ foreach (const TaskInfo& task, tasks.values()) {
+ subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+ }
+
+ mesos.send(call);
+
+ process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+ }
+
+ void disconnected()
+ {
+ state = DISCONNECTED;
+ }
+
+ void sendStatusUpdate(const TaskInfo& task, const TaskState& state)
+ {
+ UUID uuid = UUID::random();
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.mutable_executor_id()->CopyFrom(executorId);
+ status.set_state(state);
+ status.set_source(TaskStatus::SOURCE_EXECUTOR);
+ status.set_uuid(uuid.toBytes());
+
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.set_type(Call::UPDATE);
+
+ call.mutable_update()->mutable_status()->CopyFrom(status);
+
+ // Capture the status update.
+ updates[uuid] = call.update();
+
+ mesos.send(call);
+ }
+
+ void received(queue<Event> events)
+ {
+ while (!events.empty()) {
+ Event event = events.front();
+ events.pop();
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ cout << "Received a SUBSCRIBED event" << endl;
+
+ state = SUBSCRIBED;
+ break;
+ }
+
+ case Event::LAUNCH: {
+ const TaskInfo& task = event.launch().task();
+ tasks[task.task_id()] = task;
+
+ cout << "Starting task " << task.task_id().value() << endl;
+
+ sendStatusUpdate(task, TaskState::TASK_RUNNING);
+
+ // This is where one would perform the requested task.
+
+ cout << "Finishing task " << task.task_id().value() << endl;
+
+ sendStatusUpdate(task, TaskState::TASK_FINISHED);
+ break;
+ }
+
+ case Event::KILL: {
+ cout << "Received a KILL event" << endl;
+ break;
+ }
+
+ case Event::ACKNOWLEDGED: {
+ cout << "Received an ACKNOWLEDGED event" << endl;
+
+ // Remove the corresponding update.
+ updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+ // Remove the corresponding task.
+ tasks.erase(event.acknowledged().task_id());
+ break;
+ }
+
+ case Event::MESSAGE: {
+ cout << "Received a MESSAGE event" << endl;
+ break;
+ }
+
+ case Event::SHUTDOWN: {
+ cout << "Received a SHUTDOWN event" << endl;
+ break;
+ }
+
+ case Event::ERROR: {
+ cout << "Received an ERROR event" << endl;
+ break;
+ }
+ }
+ }
+ }
+
+private:
+ const FrameworkID frameworkId;
+ const ExecutorID executorId;
+ Mesos mesos;
+ enum State
+ {
+ CONNECTED,
+ DISCONNECTED,
+ SUBSCRIBED
+ } state;
+
+ LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+ LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
+};
+
+
+int main()
+{
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+
+ Option<string> value;
+
+ value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+ }
+ frameworkId.set_value(value.get());
+
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+ }
+ executorId.set_value(value.get());
+
+ process::Owned<TestExecutor> executor(
+ new TestExecutor(frameworkId, executorId));
+
+ process::spawn(executor.get());
+ process::wait(executor.get());
+
+ return EXIT_SUCCESS;
+}