You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/02 15:42:48 UTC

[6/7] mesos git commit: Added native implementation for the V0 Mesos Adapter.

Added native implementation for the V0 Mesos Adapter.

This change adds the C++ implementation for the JAVA V0 to V1 Mesos
implementation adapter (driver + scheduler).

Review: https://reviews.apache.org/r/50253


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/68c27d10
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/68c27d10
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/68c27d10

Branch: refs/heads/master
Commit: 68c27d1095dd33e569a96a05337dbd961cfdd6fe
Parents: bca68f6
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:23:05 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                                 |   9 +-
 .../org_apache_mesos_v1_scheduler_V0Mesos.cpp   | 893 +++++++++++++++++++
 2 files changed, 901 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/68c27d10/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2b02b5f..b9cc040 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1563,6 +1563,7 @@ libjava_la_SOURCES =							\
   java/jni/org_apache_mesos_state_Variable.cpp				\
   java/jni/org_apache_mesos_state_ZooKeeperState.cpp			\
   java/jni/org_apache_mesos_v1_scheduler_JNIMesos.cpp			\
+  java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp			\
   jvm/jvm.cpp								\
   jvm/jvm.hpp								\
   jvm/java/io.hpp							\
@@ -1605,7 +1606,8 @@ nodist_libjava_la_SOURCES =						\
   java/jni/org_apache_mesos_state_LogState.h				\
   java/jni/org_apache_mesos_state_Variable.h				\
   java/jni/org_apache_mesos_state_ZooKeeperState.h			\
-  java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h
+  java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h			\
+  java/jni/org_apache_mesos_v1_scheduler_V0Mesos.h
 
 BUILT_SOURCES += $(nodist_libjava_la_SOURCES)
 
@@ -1654,6 +1656,11 @@ java/jni/org_apache_mesos_v1_scheduler_JNIMesos.h: $(MESOS_JAR)
 	-classpath $(MESOS_JAR):@PROTOBUF_JAR@				\
 	  org.apache.mesos.v1.scheduler.JNIMesos
 
+java/jni/org_apache_mesos_v1_scheduler_V0Mesos.h: $(MESOS_JAR)
+	$(JAVA_HOME)/bin/javah -d java/jni				\
+	-classpath $(MESOS_JAR):@PROTOBUF_JAR@				\
+	  org.apache.mesos.v1.scheduler.V0Mesos
+
 $(EXAMPLES_JAR): $(EXAMPLES_SOURCE)
 	@echo "Building examples.jar ..."
 	$(MKDIR_P) examples/java

http://git-wip-us.apache.org/repos/asf/mesos/blob/68c27d10/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
----------------------------------------------------------------------
diff --git a/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
new file mode 100644
index 0000000..7febe95
--- /dev/null
+++ b/src/java/jni/org_apache_mesos_v1_scheduler_V0Mesos.cpp
@@ -0,0 +1,893 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <queue>
+#include <string>
+#include <vector>
+
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <mesos/v1/scheduler.hpp>
+
+#include <mesos/v1/scheduler/scheduler.hpp>
+
+#include <process/clock.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/id.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/check.hpp>
+#include <stout/exit.hpp>
+#include <stout/unreachable.hpp>
+
+#include "internal/devolve.hpp"
+#include "internal/evolve.hpp"
+
+#include "jvm/jvm.hpp"
+
+#include "master/constants.hpp"
+#include "master/validation.hpp"
+
+#include "convert.hpp"
+#include "construct.hpp"
+#include "org_apache_mesos_v1_scheduler_V0Mesos.h"
+
+using namespace mesos;
+using namespace mesos::internal::master;
+
+using std::queue;
+using std::string;
+using std::vector;
+
+using mesos::internal::devolve;
+using mesos::internal::evolve;
+
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
+using process::Clock;
+using process::Owned;
+using process::Timer;
+
+class V0ToV1AdapterProcess; // Forward declaration.
+
+// This interface acts as an adapter from the v0 (driver + scheduler) to the
+// v1 Mesos scheduler.
+class V0ToV1Adapter : public mesos::Scheduler, public v1::scheduler::MesosBase
+{
+public:
+  V0ToV1Adapter(
+      JNIEnv* env,
+      jweak jmesos,
+      const FrameworkInfo& frameworkInfo,
+      const string& master,
+      const Option<Credential>& credential);
+
+  virtual ~V0ToV1Adapter();
+
+  // v0 Scheduler interface overrides.
+  virtual void registered(
+      SchedulerDriver* driver,
+      const FrameworkID& frameworkId,
+      const MasterInfo& masterInfo) override;
+
+  virtual void reregistered(
+      SchedulerDriver* driver,
+      const MasterInfo& masterInfo) override;
+
+  virtual void disconnected(SchedulerDriver* driver) override;
+
+  virtual void resourceOffers(
+      SchedulerDriver* driver,
+      const vector<Offer>& offers) override;
+
+  virtual void offerRescinded(
+      SchedulerDriver* driver,
+      const OfferID& offerId) override;
+
+  virtual void statusUpdate(
+      SchedulerDriver* driver,
+      const TaskStatus& status) override;
+
+  virtual void frameworkMessage(
+      SchedulerDriver* driver,
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      const string& data) override;
+
+  virtual void slaveLost(
+      SchedulerDriver* driver,
+      const SlaveID& slaveId) override;
+
+  virtual void executorLost(
+      SchedulerDriver* driver,
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      int status) override;
+
+  virtual void error(
+      SchedulerDriver* driver,
+      const string& message) override;
+
+  // v1 MesosBase interface overrides.
+  virtual void send(const v1::scheduler::Call& call) override;
+
+  virtual void reconnect() override
+  {
+    // The driver does not support explicit reconnection with the master.
+    UNREACHABLE();
+  }
+
+  process::Owned<V0ToV1AdapterProcess> process;
+
+private:
+  Owned<MesosSchedulerDriver> driver;
+};
+
+
+// The process (below) is responsible for ensuring synchronized access between
+// callbacks received from the driver and calls invoked by the adapter.
+class V0ToV1AdapterProcess : public process::Process<V0ToV1AdapterProcess>
+{
+public:
+  V0ToV1AdapterProcess(JNIEnv* env, jweak jmesos);
+
+  virtual ~V0ToV1AdapterProcess() = default;
+
+  void registered(const FrameworkID& frameworkId);
+
+  void reregistered();
+
+  void disconnected();
+
+  void resourceOffers(const vector<Offer>& offers);
+
+  void offerRescinded(const OfferID& offerId);
+
+  void statusUpdate(const TaskStatus& status);
+
+  void frameworkMessage(
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      const string& data);
+
+  void slaveLost(const SlaveID& slaveId);
+
+  void executorLost(
+      const ExecutorID& executorId,
+      const SlaveID& slaveId,
+      int status);
+
+  void error(const string& message);
+
+  void send(SchedulerDriver*, const v1::scheduler::Call& call);
+
+  JavaVM* jvm;
+  JNIEnv* env;
+  jweak jmesos;
+
+protected:
+  void received(const Event& event);
+
+  void _received();
+
+  void __received(const Event& event);
+
+  void heartbeat();
+
+  void disconnect();
+
+private:
+  bool subscribeCall;
+  const Duration interval;
+  queue<Event> pending;
+  Option<FrameworkID> frameworkId;
+  Option<Timer> heartbeatTimer;
+};
+
+
+V0ToV1Adapter::V0ToV1Adapter(
+    JNIEnv* env,
+    jweak jmesos,
+    const FrameworkInfo& frameworkInfo,
+    const string& master,
+    const Option<Credential>& credential)
+  : process(new V0ToV1AdapterProcess(env, jmesos))
+{
+  spawn(process.get());
+
+  driver.reset(
+      credential.isSome()
+        // Disable implicit acks.
+        ? new MesosSchedulerDriver(
+              this, frameworkInfo, master, false, credential.get())
+        : new MesosSchedulerDriver(this, frameworkInfo, master, false));
+
+  driver->start();
+}
+
+
+V0ToV1Adapter::~V0ToV1Adapter()
+{
+  terminate(process.get());
+  wait(process.get());
+}
+
+
+void V0ToV1Adapter::error(
+    SchedulerDriver*,
+    const string& message)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::error, message);
+}
+
+
+void V0ToV1Adapter::executorLost(
+    SchedulerDriver*,
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    int status)
+{
+  process::dispatch(
+      process.get(),
+      &V0ToV1AdapterProcess::executorLost,
+      executorId,
+      slaveId,
+      status);
+}
+
+
+void V0ToV1Adapter::slaveLost(
+    SchedulerDriver*,
+    const SlaveID& slaveId)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::slaveLost, slaveId);
+}
+
+
+void V0ToV1Adapter::frameworkMessage(
+    SchedulerDriver*,
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    const string& data)
+{
+  process::dispatch(
+      process.get(),
+      &V0ToV1AdapterProcess::frameworkMessage,
+      executorId,
+      slaveId,
+      data);
+}
+
+
+void V0ToV1Adapter::statusUpdate(
+    SchedulerDriver*,
+    const TaskStatus& status)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::statusUpdate, status);
+}
+
+
+void V0ToV1Adapter::offerRescinded(
+    SchedulerDriver*,
+    const OfferID& offerId)
+{
+  process::dispatch(
+      process.get(), &V0ToV1AdapterProcess::offerRescinded, offerId);
+}
+
+
+void V0ToV1Adapter::resourceOffers(
+    SchedulerDriver*,
+    const vector<Offer>& offers)
+{
+  process::dispatch(
+      process.get(), &V0ToV1AdapterProcess::resourceOffers, offers);
+}
+
+
+void V0ToV1Adapter::registered(
+    SchedulerDriver*,
+    const FrameworkID &frameworkId,
+    const MasterInfo&)
+{
+  process::dispatch(
+      process.get(),
+      &V0ToV1AdapterProcess::registered,
+      frameworkId);
+}
+
+
+void V0ToV1Adapter::reregistered(
+    SchedulerDriver*,
+    const MasterInfo&)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::reregistered);
+}
+
+
+void V0ToV1Adapter::disconnected(SchedulerDriver*)
+{
+  process::dispatch(process.get(), &V0ToV1AdapterProcess::disconnected);
+}
+
+
+void V0ToV1Adapter::send(const Call& call)
+{
+  process::dispatch(
+      process.get(), &V0ToV1AdapterProcess::send, driver.get(), call);
+}
+
+
+V0ToV1AdapterProcess::V0ToV1AdapterProcess(
+    JNIEnv* _env,
+    jweak _jmesos)
+  : ProcessBase(process::ID::generate("SchedulerV0ToV1Adapter")),
+    jvm(nullptr),
+    env(_env),
+    jmesos(_jmesos),
+    subscribeCall(false),
+    interval(DEFAULT_HEARTBEAT_INTERVAL)
+{
+  env->GetJavaVM(&jvm);
+}
+
+
+void V0ToV1AdapterProcess::registered(const FrameworkID& _frameworkId)
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.connected(mesos);
+  jmethodID connected =
+    env->GetMethodID(clazz, "connected",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, connected, jmesos);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `connected` call");
+  }
+
+  jvm->DetachCurrentThread();
+
+  // We need this copy to populate the fields in `Event::Subscribed` upon
+  // receiving a `reregistered()` callback later.
+  frameworkId = _frameworkId;
+
+  // These events are queued and delivered to the scheduler upon receiving the
+  // subscribe call later. See comments in `send()` for more details.
+  {
+    Event event;
+    event.set_type(Event::SUBSCRIBED);
+
+    Event::Subscribed* subscribed = event.mutable_subscribed();
+
+    subscribed->mutable_framework_id()->CopyFrom(evolve(frameworkId.get()));
+
+    subscribed->set_heartbeat_interval_seconds(interval.secs());
+
+    received(event);
+  }
+
+  {
+    Event event;
+    event.set_type(Event::HEARTBEAT);
+
+    received(event);
+  }
+}
+
+
+void V0ToV1AdapterProcess::reregistered()
+{
+  CHECK_SOME(frameworkId);
+  registered(frameworkId.get());
+}
+
+
+void V0ToV1AdapterProcess::disconnected()
+{
+  disconnect();
+
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), NULL);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.disconnected(mesos);
+  jmethodID disconnected =
+    env->GetMethodID(clazz, "disconnected",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;)V");
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jmesos, disconnected);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `disconnected` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+
+void V0ToV1AdapterProcess::resourceOffers(const vector<Offer>& _offers)
+{
+  Event event;
+  event.set_type(Event::OFFERS);
+
+  Event::Offers* offers = event.mutable_offers();
+
+  foreach (const Offer& offer, _offers) {
+    offers->add_offers()->CopyFrom(evolve(offer));
+  }
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::offerRescinded(const OfferID& offerId)
+{
+  Event event;
+  event.set_type(Event::RESCIND);
+
+  event.mutable_rescind()->mutable_offer_id()->
+    CopyFrom(evolve(offerId));
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::statusUpdate(const TaskStatus& status)
+{
+  Event event;
+  event.set_type(Event::UPDATE);
+
+  event.mutable_update()->mutable_status()->
+    CopyFrom(mesos::internal::evolve(status));
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::frameworkMessage(
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    const string& data)
+{
+  Event event;
+  event.set_type(Event::MESSAGE);
+
+  event.mutable_message()->mutable_agent_id()->
+    CopyFrom(mesos::internal::evolve(slaveId));
+
+  event.mutable_message()->mutable_executor_id()->
+    CopyFrom(mesos::internal::evolve(executorId));
+
+  event.mutable_message()->set_data(data.data());
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::slaveLost(const SlaveID& slaveId)
+{
+  Event event;
+  event.set_type(Event::FAILURE);
+
+  event.mutable_failure()->mutable_agent_id()->
+    CopyFrom(mesos::internal::evolve(slaveId));
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::executorLost(
+    const ExecutorID& executorId,
+    const SlaveID& slaveId,
+    int status)
+{
+  Event event;
+  event.set_type(Event::FAILURE);
+
+  event.mutable_failure()->mutable_agent_id()->
+    CopyFrom(mesos::internal::evolve(slaveId));
+
+  event.mutable_failure()->mutable_executor_id()->
+    CopyFrom(mesos::internal::evolve(executorId));
+
+  event.mutable_failure()->set_status(status);
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::error(const string& message)
+{
+  Event event;
+  event.set_type(Event::ERROR);
+
+  event.mutable_error()->set_message(message);
+
+  received(event);
+}
+
+
+void V0ToV1AdapterProcess::send(SchedulerDriver* driver, const Call& _call)
+{
+  CHECK_NOTNULL(driver);
+
+  scheduler::Call call = devolve(_call);
+
+  Option<Error> error = validation::scheduler::call::validate(call);
+  if (error.isSome()) {
+    LOG(WARNING) << "Dropping " << call.type() << ": due to error "
+                 << error->message;
+    return;
+  }
+
+  switch (call.type()) {
+    case Call::SUBSCRIBE: {
+      subscribeCall = true;
+
+      heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
+
+      // The driver subscribes implicitly with the master upon initialization.
+      // For compatibility with the v1 interface, send the already enqueued
+      // subscribed event upon receiving the subscribe request.
+      _received();
+      break;
+    }
+
+    case Call::TEARDOWN: {
+      driver->stop(false);
+      break;
+    }
+
+    case Call::ACCEPT: {
+      vector<OfferID> offerIds;
+      foreach (const OfferID& offerId, call.accept().offer_ids()) {
+        offerIds.emplace_back(offerId);
+      }
+
+      vector<Offer::Operation> operations;
+      foreach (const Offer::Operation& operation, call.accept().operations()) {
+        operations.emplace_back(operation);
+      }
+
+      if (call.accept().has_filters()) {
+        driver->acceptOffers(offerIds, operations, call.accept().filters());
+      } else {
+        driver->acceptOffers(offerIds, operations);
+      }
+
+      break;
+    }
+
+    case Call::ACCEPT_INVERSE_OFFERS:
+    case Call::DECLINE_INVERSE_OFFERS:
+    case Call::SHUTDOWN: {
+      // TODO(anand): Throw java error.
+      LOG(ERROR) << "Received an unexpected " << call.type() << " call";
+      break;
+    }
+
+    case Call::DECLINE: {
+      foreach (const OfferID& offerId, call.decline().offer_ids()) {
+        if (call.decline().has_filters()) {
+          driver->declineOffer(offerId, call.decline().filters());
+        } else {
+          driver->declineOffer(offerId);
+        }
+      }
+
+      break;
+    }
+
+    case Call::REVIVE: {
+      driver->reviveOffers();
+      break;
+    }
+
+    case Call::KILL: {
+      driver->killTask(call.kill().task_id());
+      break;
+    }
+
+    case Call::ACKNOWLEDGE: {
+      TaskStatus status;
+      status.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
+      status.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
+      status.set_uuid(call.acknowledge().uuid());
+
+      driver->acknowledgeStatusUpdate(status);
+      break;
+    }
+
+    case Call::RECONCILE: {
+      vector<TaskStatus> statuses;
+
+      foreach (const scheduler::Call::Reconcile::Task& task,
+               call.reconcile().tasks()) {
+        TaskStatus status;
+        status.mutable_task_id()->CopyFrom(task.task_id());
+        statuses.emplace_back(status);
+      }
+
+      driver->reconcileTasks(statuses);
+      break;
+    }
+
+    case Call::MESSAGE: {
+      driver->sendFrameworkMessage(
+          call.message().executor_id(),
+          call.message().slave_id(),
+          string(call.message().data()));
+      break;
+    }
+
+    case Call::REQUEST: {
+      vector<Request> requests;
+
+      foreach (const Request& request, call.request().requests()) {
+        requests.emplace_back(request);
+      }
+
+      driver->requestResources(requests);
+      break;
+    }
+
+    case Call::SUPPRESS: {
+      driver->suppressOffers();
+      break;
+    }
+
+    case Call::UNKNOWN: {
+      EXIT(EXIT_FAILURE) << "Received an unexpected " << call.type()
+                         << " call";
+      break;
+    }
+  }
+}
+
+
+void V0ToV1AdapterProcess::received(const Event& event)
+{
+  // For compatibility with the v1 interface, we only start sending events
+  // once the scheduler has sent the subscribe call.
+  if (!subscribeCall) {
+    pending.push(event);
+    return;
+  }
+
+  pending.push(event);
+
+  _received();
+}
+
+
+void V0ToV1AdapterProcess::_received()
+{
+  CHECK(subscribeCall);
+
+  while (!pending.empty()) {
+    __received(pending.front());
+    pending.pop();
+  }
+}
+
+
+void V0ToV1AdapterProcess::__received(const Event& event)
+{
+  jvm->AttachCurrentThread(JNIENV_CAST(&env), nullptr);
+
+  jclass clazz = env->GetObjectClass(jmesos);
+
+  jfieldID scheduler =
+    env->GetFieldID(clazz, "scheduler",
+                    "Lorg/apache/mesos/v1/scheduler/Scheduler;");
+
+  jobject jscheduler = env->GetObjectField(jmesos, scheduler);
+
+  clazz = env->GetObjectClass(jscheduler);
+
+  // scheduler.received(mesos, event);
+  jmethodID received =
+    env->GetMethodID(clazz, "received",
+                     "(Lorg/apache/mesos/v1/scheduler/Mesos;"
+                     "Lorg/apache/mesos/v1/scheduler/Protos$Event;)V");
+
+  jobject jevent = convert<Event>(env, event);
+
+  env->ExceptionClear();
+
+  env->CallVoidMethod(jscheduler, received, jmesos, jevent);
+
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    jvm->DetachCurrentThread();
+    ABORT("Exception thrown during `received` call");
+  }
+
+  jvm->DetachCurrentThread();
+}
+
+
+void V0ToV1AdapterProcess::heartbeat()
+{
+  // It is possible that we were unable to cancel this timer upon a
+  // disconnection. If this occurs, don't bother sending the heartbeat
+  // event.
+  if (heartbeatTimer.isNone() || !heartbeatTimer->timeout().expired()) {
+    return;
+  }
+
+  CHECK(subscribeCall)
+    << "Cannot send heartbeat events to the scheduler without receiving a "
+    << "subscribe call";
+
+  Event event;
+  event.set_type(Event::HEARTBEAT);
+
+  received(event);
+
+  heartbeatTimer = process::delay(interval, self(), &Self::heartbeat);
+}
+
+
+void V0ToV1AdapterProcess::disconnect()
+{
+  // Upon noticing a disconnection with the master, we drain the pending
+  // events in the queue that were waiting to be sent to the scheduler
+  // upon receiving the subscribe call.
+  // It's fine to do so because:
+  // - Any outstanding offers are invalidated by the master upon a scheduler
+  //   (re-)registration.
+  // - Any task status updates could be reconciled by the scheduler.
+  pending = queue<Event>();
+  subscribeCall = false;
+
+  if (heartbeatTimer.isSome()) {
+    Clock::cancel(heartbeatTimer.get());
+    heartbeatTimer = None();
+  }
+}
+
+
+extern "C" {
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_V0Mesos
+ * Method:    initialize
+ * Signature: ()V
+ *
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_initialize
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  // Create a weak global reference to the Scheduler
+  // instance (we want a global reference so the GC doesn't collect
+  // the instance but we make it weak so the JVM can exit).
+  jweak jmesos = env->NewWeakGlobalRef(thiz);
+
+  // Get out the FrameworkInfo passed into the constructor.
+  jfieldID framework =
+    env->GetFieldID(clazz, "framework",
+                    "Lorg/apache/mesos/v1/Protos$FrameworkInfo;");
+
+  jobject jframework = env->GetObjectField(thiz, framework);
+
+  // Get out the master passed into the constructor.
+  jfieldID master = env->GetFieldID(clazz, "master", "Ljava/lang/String;");
+  jobject jmaster = env->GetObjectField(thiz, master);
+
+  // Get out the credential passed into the constructor.
+  jfieldID credential =
+    env->GetFieldID(clazz, "credential",
+                    "Lorg/apache/mesos/v1/Protos$Credential;");
+
+  jobject jcredential = env->GetObjectField(thiz, credential);
+
+  Option<Credential> credential_;
+  if (!env->IsSameObject(jcredential, nullptr)) {
+    credential_ = construct<Credential>(env, jcredential);
+  }
+
+  // Create the C++ scheduler and initialize the `__mesos` variable.
+  V0ToV1Adapter* mesos =
+    new V0ToV1Adapter(
+        env,
+        jmesos,
+        devolve(construct<v1::FrameworkInfo>(env, jframework)),
+        construct<string>(env, jmaster),
+        credential_);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+  env->SetLongField(thiz, __mesos, (jlong) mesos);
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_V0Mesos
+ * Method:    finalize
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_finalize
+  (JNIEnv* env, jobject thiz)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+
+  V0ToV1Adapter* mesos =
+    (V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
+
+  env->DeleteWeakGlobalRef(mesos->process->jmesos);
+
+  delete mesos;
+}
+
+
+/*
+ * Class:     org_apache_mesos_v1_scheduler_V0Mesos
+ * Method:    send
+ * Signature: (Lorg/apache/mesos/v1/scheduler/Protos/Call;)V
+ */
+JNIEXPORT void JNICALL Java_org_apache_mesos_v1_scheduler_V0Mesos_send
+  (JNIEnv* env, jobject thiz, jobject jcall)
+{
+  jclass clazz = env->GetObjectClass(thiz);
+
+  jfieldID __mesos = env->GetFieldID(clazz, "__mesos", "J");
+
+  V0ToV1Adapter* mesos =
+    (V0ToV1Adapter*) env->GetLongField(thiz, __mesos);
+
+  mesos->send(construct<Call>(env, jcall));
+}
+
+} // extern "C" {