You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/02/04 23:42:37 UTC

[4/5] mesos git commit: Added an example executor based on the new V1 API.

Added an example executor based on the new V1 API.

This change adds a custom executor based on the new executor library.

Review: https://reviews.apache.org/r/42185/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8bb86676
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8bb86676
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8bb86676

Branch: refs/heads/master
Commit: 8bb8667602b78da74374e669a6ebfdb8e5d56972
Parents: e3c3cf8
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Thu Feb 4 14:41:33 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Thu Feb 4 14:41:33 2016 -0800

----------------------------------------------------------------------
 src/Makefile.am                     |   5 +
 src/examples/test_http_executor.cpp | 241 +++++++++++++++++++++++++++++++
 2 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 420aa40..22f5131 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1601,6 +1601,11 @@ test_executor_SOURCES = examples/test_executor.cpp
 test_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 test_executor_LDADD = libmesos.la $(LDADD)
 
+check_PROGRAMS += test-http-executor
+test_http_executor_SOURCES = examples/test_http_executor.cpp
+test_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+test_http_executor_LDADD = libmesos.la $(LDADD)
+
 check_PROGRAMS += long-lived-framework
 long_lived_framework_SOURCES = examples/long_lived_framework.cpp
 long_lived_framework_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/8bb86676/src/examples/test_http_executor.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_http_executor.cpp b/src/examples/test_http_executor.cpp
new file mode 100644
index 0000000..4916e0e
--- /dev/null
+++ b/src/examples/test_http_executor.cpp
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <iostream>
+#include <queue>
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/exit.hpp>
+#include <stout/linkedhashmap.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/uuid.hpp>
+
+using std::cout;
+using std::endl;
+using std::queue;
+using std::string;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+using mesos::v1::TaskID;
+using mesos::v1::TaskInfo;
+using mesos::v1::TaskState;
+using mesos::v1::TaskStatus;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
+using process::spawn;
+using process::wait;
+
+
+class TestExecutor: public process::Process<TestExecutor>
+{
+public:
+  TestExecutor(const FrameworkID& _frameworkId, const ExecutorID& _executorId)
+    : frameworkId(_frameworkId),
+      executorId(_executorId),
+      mesos(mesos::ContentType::PROTOBUF,
+            process::defer(self(), &Self::connected),
+            process::defer(self(), &Self::disconnected),
+            process::defer(self(), &Self::received, lambda::_1)),
+      state(DISCONNECTED) {}
+
+  void connected()
+  {
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    // Send all unacknowledged updates.
+    foreach (const Call::Update& update, updates.values()) {
+      subscribe->add_unacknowledged_updates()->MergeFrom(update);
+    }
+
+    // Send all unacknowledged tasks.
+    foreach (const TaskInfo& task, tasks.values()) {
+      subscribe->add_unacknowledged_tasks()->MergeFrom(task);
+    }
+
+    mesos.send(call);
+
+    process::delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void sendStatusUpdate(const TaskInfo& task, const TaskState& state)
+  {
+    UUID uuid = UUID::random();
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(task.task_id());
+    status.mutable_executor_id()->CopyFrom(executorId);
+    status.set_state(state);
+    status.set_source(TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(uuid.toBytes());
+
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.set_type(Call::UPDATE);
+
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+
+    // Capture the status update.
+    updates[uuid] = call.update();
+
+    mesos.send(call);
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << "Received a SUBSCRIBED event" << endl;
+
+          state = SUBSCRIBED;
+          break;
+        }
+
+        case Event::LAUNCH: {
+          const TaskInfo& task = event.launch().task();
+          tasks[task.task_id()] = task;
+
+          cout << "Starting task " << task.task_id().value() << endl;
+
+          sendStatusUpdate(task, TaskState::TASK_RUNNING);
+
+          // This is where one would perform the requested task.
+
+          cout << "Finishing task " << task.task_id().value() << endl;
+
+          sendStatusUpdate(task, TaskState::TASK_FINISHED);
+          break;
+        }
+
+        case Event::KILL: {
+          cout << "Received a KILL event" << endl;
+          break;
+        }
+
+        case Event::ACKNOWLEDGED: {
+          cout << "Received an ACKNOWLEDGED event" << endl;
+
+          // Remove the corresponding update.
+          updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+          // Remove the corresponding task.
+          tasks.erase(event.acknowledged().task_id());
+          break;
+        }
+
+        case Event::MESSAGE: {
+          cout << "Received a MESSAGE event" << endl;
+          break;
+        }
+
+        case Event::SHUTDOWN: {
+          cout << "Received a SHUTDOWN event" << endl;
+          break;
+        }
+
+        case Event::ERROR: {
+          cout << "Received an ERROR event" << endl;
+          break;
+        }
+      }
+    }
+  }
+
+private:
+  const FrameworkID frameworkId;
+  const ExecutorID executorId;
+  Mesos mesos;
+  enum State
+  {
+    CONNECTED,
+    DISCONNECTED,
+    SUBSCRIBED
+  } state;
+
+  LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+  LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
+};
+
+
+int main()
+{
+  FrameworkID frameworkId;
+  ExecutorID executorId;
+
+  Option<string> value;
+
+  value = os::getenv("MESOS_FRAMEWORK_ID");
+  if (value.isNone()) {
+    EXIT(1) << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+  }
+  frameworkId.set_value(value.get());
+
+  value = os::getenv("MESOS_EXECUTOR_ID");
+  if (value.isNone()) {
+    EXIT(1) << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+  }
+  executorId.set_value(value.get());
+
+  process::Owned<TestExecutor> executor(
+      new TestExecutor(frameworkId, executorId));
+
+  process::spawn(executor.get());
+  process::wait(executor.get());
+
+  return EXIT_SUCCESS;
+}